[Zope-Checkins] SVN: Zope/trunk/src/Products/ZCatalog/ Merge in the essential parts of unimr's queryplan

Hanno Schlichting hannosch at hannosch.eu
Sun Aug 1 16:26:37 EDT 2010


Log message for revision 115349:
  Merge in the essential parts of unimr's queryplan
  

Changed:
  U   Zope/trunk/src/Products/ZCatalog/Catalog.py
  U   Zope/trunk/src/Products/ZCatalog/report.py
  U   Zope/trunk/src/Products/ZCatalog/tests/test_catalog.py

-=-
Modified: Zope/trunk/src/Products/ZCatalog/Catalog.py
===================================================================
--- Zope/trunk/src/Products/ZCatalog/Catalog.py	2010-08-01 19:17:49 UTC (rev 115348)
+++ Zope/trunk/src/Products/ZCatalog/Catalog.py	2010-08-01 20:26:37 UTC (rev 115349)
@@ -482,7 +482,7 @@
                 continue
             order.append((ILimitedResultIndex.providedBy(index), name))
         order.sort()
-        return order
+        return [i[1] for i in order]
 
     def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
         """Iterate through the indexes, applying the query to each one. If
@@ -506,17 +506,22 @@
 
         # Canonicalize the request into a sensible query before passing it on
         query = self.make_query(query)
+
         cr = self.getCatalogReport(query)
         cr.start()
 
-        for limit_result, i in self._sorted_search_indexes(query):
+        plan = cr.plan()
+        if not plan:
+            plan = self._sorted_search_indexes(query)
+
+        for i in plan:
             index = self.getIndex(i)
             _apply_index = getattr(index, "_apply_index", None)
             if _apply_index is None:
                 continue
 
-            cr.split(i)
-            if limit_result:
+            cr.start_split(i)
+            if ILimitedResultIndex.providedBy(index):
                 r = _apply_index(query, rs)
             else:
                 r = _apply_index(query)
@@ -528,14 +533,15 @@
                 # once we don't need to support the "return everything" case
                 # anymore
                 if r is not None and not r:
-                    cr.split(i, None)
+                    cr.stop_split(i, None)
                     return LazyCat([])
-                cr.split(i, r)
+
+                cr.stop_split(i, r)
                 w, rs = weightedIntersection(rs, r)
                 if not rs:
                     break
             else:
-                cr.split(i, None)
+                cr.stop_split(i, None)
 
         cr.stop()
 

Modified: Zope/trunk/src/Products/ZCatalog/report.py
===================================================================
--- Zope/trunk/src/Products/ZCatalog/report.py	2010-08-01 19:17:49 UTC (rev 115348)
+++ Zope/trunk/src/Products/ZCatalog/report.py	2010-08-01 20:26:37 UTC (rev 115349)
@@ -25,6 +25,7 @@
 value_indexes = frozenset()
 
 MAX_DISTINCT_VALUES = 10
+REFRESH_RATE = 100
 
 
 def determine_value_indexes(indexes):
@@ -101,67 +102,121 @@
     return key
 
 
-class StopWatch(object):
-    """ Simple stopwatch class """
+class CatalogReport(object):
+    """Catalog report class to meassure and identify catalog queries.
+    """
 
-    def __init__(self):
+    def __init__(self, catalog, query=None, threshold=0.1):
         self.init()
+        self.catalog = catalog
+        self.query = query
+        self._key = None
+        self.threshold = threshold
 
+        parent = aq_parent(catalog)
+        path = getattr(aq_base(parent), 'getPhysicalPath', None)
+        if path is None:
+            path = ('', 'NonPersistentCatalog')
+        else:
+            path = tuple(parent.getPhysicalPath())
+        self.cid = path
+
     def init(self):
         self.res = []
         self.start_time = None
         self.interim = {}
         self.stop_time = None
+        self.duration = None
 
+    def prioritymap(self):
+        # holds the benchmark of each index
+        prioritymap = getattr(self.catalog, '_v_prioritymap', None)
+        if prioritymap is None:
+            prioritymap = self.catalog._v_prioritymap = {}
+        return prioritymap
+
+    def benchmark(self):
+        # holds the benchmark of each index
+        return self.prioritymap().get(self.key(), None)
+
+    def plan(self):
+        benchmark = self.benchmark()
+        if not benchmark:
+            return None
+
+        # sort indexes on (mean hits, mean search time)
+        ranking = [((v[0], v[1]), k) for k, v in benchmark.items()]
+        ranking.sort()
+        return [i[1] for i in ranking]
+
     def start(self):
         self.init()
         self.start_time = time.time()
+        benchmark = self.benchmark()
+        if benchmark is None:
+            self.prioritymap()[self.key()] = {}
 
-    def split(self, label, result=None):
+    def start_split(self, label, result=None):
+        self.interim[label] = (time.time(), None)
+
+    def stop_split(self, name, result=None):
         current = time.time()
-        start_time, stop_time = self.interim.get(label, (None, None))
-
-        if start_time is None:
-            self.interim[label] = (current, None)
-            return
-
+        start_time, stop_time = self.interim.get(name, (None, None))
         length = 0
         if result is not None:
             # TODO: calculating the length can be expensive
             length = len(result)
-        self.interim[label] = (start_time, current)
-        self.res.append((label, current - start_time, length))
+        self.interim[name] = (start_time, current)
+        dt = current - start_time
+        self.res.append((name, current - start_time, length))
 
+        # remember index's hits, search time and calls
+        benchmark = self.benchmark()
+        if name not in benchmark:
+            benchmark[name] = (length, dt, 1)
+        else:
+            n, t, c = benchmark[name]
+            n = int(((n*c) + length) / float(c + 1))
+            t = ((t*c) + dt) / float(c + 1)
+            # reset adaption
+            if c % REFRESH_RATE == 0:
+                c = 0
+            c += 1
+            benchmark[name] = (n, t, c)
+
     def stop(self):
         self.end_time = time.time()
+        self.duration = self.end_time - self.start_time
 
-    def result(self):
-        return (self.end_time - self.start_time, tuple(self.res))
+        key = self.key()
+        benchmark = self.benchmark()
+        prioritymap = self.prioritymap()
+        prioritymap[key] = benchmark
 
+        # calculate mean time of search
+        stats = getattr(self.catalog, '_v_stats', None)
+        if stats is None:
+            stats = self.catalog._v_stats = {}
 
-class CatalogReport(StopWatch):
-    """Catalog report class to meassure and identify catalog queries.
-    """
+        if key not in stats:
+            mt = self.duration
+            c = 1
+        else:
+            mt, c = stats[key]
+            mt = ((mt * c) + self.duration) / float(c + 1)
+            c += 1
 
-    def __init__(self, catalog, query=None, threshold=0.1):
-        super(CatalogReport, self).__init__()
+        stats[key] = (mt, c)
+        self.log()
 
-        self.catalog = catalog
-        self.query = query
-        self.threshold = threshold
+    def result(self):
+        return (self.duration, tuple(self.res))
 
-        parent = aq_parent(catalog)
-        path = getattr(aq_base(parent), 'getPhysicalPath', None)
-        if path is None:
-            path = ('', 'NonPersistentCatalog')
-        else:
-            path = tuple(parent.getPhysicalPath())
-        self.cid = path
+    def key(self):
+        if not self._key:
+            self._key = make_key(self.catalog, self.query)
+        return self._key
 
-    def stop(self):
-        super(CatalogReport, self).stop()
-        self.log()
-
     def log(self):
         # result of stopwatch
         res = self.result()
@@ -171,7 +226,7 @@
         # The key calculation takes a bit itself, we want to avoid that for
         # any fast queries. This does mean that slow queries get the key
         # calculation overhead added to their runtime.
-        key = make_key(self.catalog, self.query)
+        key = self.key()
 
         reports_lock.acquire()
         try:

Modified: Zope/trunk/src/Products/ZCatalog/tests/test_catalog.py
===================================================================
--- Zope/trunk/src/Products/ZCatalog/tests/test_catalog.py	2010-08-01 19:17:49 UTC (rev 115348)
+++ Zope/trunk/src/Products/ZCatalog/tests/test_catalog.py	2010-08-01 20:26:37 UTC (rev 115349)
@@ -290,19 +290,18 @@
 
     def test_sorted_search_indexes_one(self):
         result = self._catalog._sorted_search_indexes({'att1': 'a'})
-        self.assertEquals(result, [(True, 'att1')])
+        self.assertEquals(result, ['att1'])
 
     def test_sorted_search_indexes_many(self):
         query = {'att1': 'a', 'att2': 'b', 'num': 1}
         result = self._catalog._sorted_search_indexes(query)
-        indexes = [r[1] for r in result]
-        self.assertEquals(set(indexes), set(['att1', 'att2', 'num']))
+        self.assertEquals(set(result), set(['att1', 'att2', 'num']))
 
     def test_sorted_search_indexes_priority(self):
         # att2 and col2 don't support ILimitedResultIndex, att1 does
         query = {'att1': 'a', 'att2': 'b', 'col2': 'c'}
         result = self._catalog._sorted_search_indexes(query)
-        self.assertEquals(result.index((True, 'att1')), 2)
+        self.assertEquals(result.index('att1'), 2)
 
     # search
     # sortResults



More information about the Zope-Checkins mailing list