[Zope-Checkins] SVN: Zope/trunk/src/Products/ZCatalog/ Fill in more tests for the plan

Hanno Schlichting hannosch at hannosch.eu
Thu Aug 5 16:10:41 EDT 2010


Log message for revision 115510:
  Fill in more tests for the plan
  

Changed:
  U   Zope/trunk/src/Products/ZCatalog/plan.py
  U   Zope/trunk/src/Products/ZCatalog/tests/test_plan.py

-=-
Modified: Zope/trunk/src/Products/ZCatalog/plan.py
===================================================================
--- Zope/trunk/src/Products/ZCatalog/plan.py	2010-08-05 19:16:37 UTC (rev 115509)
+++ Zope/trunk/src/Products/ZCatalog/plan.py	2010-08-05 20:10:41 UTC (rev 115510)
@@ -251,7 +251,7 @@
         self.init_timer()
         self.start_time = time.time()
 
-    def start_split(self, name, result=None):
+    def start_split(self, name):
         self.interim[name] = Duration(time.time(), None)
 
     def stop_split(self, name, result=None, limit=False):

Modified: Zope/trunk/src/Products/ZCatalog/tests/test_plan.py
===================================================================
--- Zope/trunk/src/Products/ZCatalog/tests/test_plan.py	2010-08-05 19:16:37 UTC (rev 115509)
+++ Zope/trunk/src/Products/ZCatalog/tests/test_plan.py	2010-08-05 20:10:41 UTC (rev 115510)
@@ -12,6 +12,7 @@
 ##############################################################################
 
 import os
+import time
 import unittest
 
 from zope.testing import cleanup
@@ -194,26 +195,90 @@
 
     def setUp(self):
         cleanup.CleanUp.setUp(self)
+        from Products.ZCatalog.Catalog import Catalog
+        self.cat = Catalog('catalog')
+
+    def _makeOne(self, catalog=None, query=None):
+        from ..plan import CatalogPlan
+        if catalog is None:
+            catalog = self.cat
+        return CatalogPlan(catalog, query=query)
+
+    def test_get_id(self):
+        plan = self._makeOne()
+        self.assertEquals(plan.get_id(), ('', 'NonPersistentCatalog'))
+
+    def test_get_id_persistent(self):
         from Products.ZCatalog.ZCatalog import ZCatalog
-        self.zcat = ZCatalog('catalog')
-        self.zcat.long_query_time = 0.0
-        self.zcat.addIndex('num', 'FieldIndex')
-        self.zcat.addIndex('big', 'FieldIndex')
-        self.zcat.addIndex('numbers', 'KeywordIndex')
+        zcat = ZCatalog('catalog')
+        plan = self._makeOne(zcat._catalog)
+        self.assertEquals(plan.get_id(), ('catalog',))
 
-        for i in range(9):
-            obj = dummy(i)
-            self.zcat.catalog_object(obj, str(i))
+    def test_plan_empty(self):
+        plan = self._makeOne()
+        self.assertEquals(plan.plan(), None)
 
-    # get_id
-    # init_timer
-    # plan
-    # start
-    # start_split
-    # stop_split
-    # stop
-    # log
+    def test_start(self):
+        plan = self._makeOne()
+        plan.start()
+        self.assert_(plan.start_time <= time.time())
 
+    def test_start_split(self):
+        plan = self._makeOne()
+        plan.start_split('index1')
+        self.assert_('index1' in plan.interim)
+
+    def test_stop_split(self):
+        plan = self._makeOne()
+        plan.start_split('index1')
+        plan.stop_split('index1')
+        self.assert_('index1' in plan.interim)
+        i1 = plan.interim['index1']
+        self.assert_(i1.start <= i1.end)
+        self.assert_('index1' in plan.benchmark)
+
+    def test_stop_split_sort_on(self):
+        plan = self._makeOne()
+        plan.start_split('sort_on')
+        plan.stop_split('sort_on')
+        self.assert_('sort_on' in plan.interim)
+        so = plan.interim['sort_on']
+        self.assert_(so.start <= so.end)
+        self.assert_('sort_on' not in plan.benchmark)
+
+    def test_stop(self):
+        plan = self._makeOne(query={'index1': 1, 'index2': 2})
+        plan.start()
+        plan.start_split('index1')
+        plan.stop_split('index1')
+        plan.start_split('index1')
+        plan.stop_split('index1')
+        plan.start_split('sort_on')
+        plan.stop_split('sort_on')
+        plan.stop()
+
+        self.assert_(plan.duration > 0)
+        self.assert_('index1' in plan.benchmark)
+        self.assertEquals(plan.benchmark['index1'].hits, 2)
+        self.assert_('index2' in plan.benchmark)
+        self.assertEquals(plan.benchmark['index2'].hits, 0)
+        self.assertEquals(set(plan.plan()), set(('index1', 'index2')))
+
+    def test_log(self):
+        plan = self._makeOne(query={'index1': 1})
+        plan.threshold = 0.0
+        plan.start()
+        plan.start_split('index1')
+        plan.stop_split('index1')
+        plan.stop()
+        plan.log()
+        report = plan.report()
+        self.assertEquals(len(report), 1)
+        self.assertEquals(report[0]['counter'], 2)
+        plan.reset()
+        self.assertEquals(len(plan.report()), 0)
+
+
 class TestCatalogReport(cleanup.CleanUp, unittest.TestCase):
 
     def setUp(self):



More information about the Zope-Checkins mailing list