[Checkins] SVN: zc.catalogqueue/trunk/src/zc/catalogqueue/ track the number of pending cataloging events

Fred L. Drake, Jr. fdrake at gmail.com
Fri May 9 13:58:51 EDT 2008


Log message for revision 86576:
  track the number of pending cataloging events

Changed:
  U   zc.catalogqueue/trunk/src/zc/catalogqueue/CHANGES.txt
  U   zc.catalogqueue/trunk/src/zc/catalogqueue/CatalogEventQueue.py
  U   zc.catalogqueue/trunk/src/zc/catalogqueue/interfaces.py
  U   zc.catalogqueue/trunk/src/zc/catalogqueue/queue.py
  U   zc.catalogqueue/trunk/src/zc/catalogqueue/queue.txt

-=-
Modified: zc.catalogqueue/trunk/src/zc/catalogqueue/CHANGES.txt
===================================================================
--- zc.catalogqueue/trunk/src/zc/catalogqueue/CHANGES.txt	2008-05-09 17:44:03 UTC (rev 86575)
+++ zc.catalogqueue/trunk/src/zc/catalogqueue/CHANGES.txt	2008-05-09 17:58:50 UTC (rev 86576)
@@ -6,10 +6,13 @@
 
 Added some processing state information as attributes of the queue object:
 
-``lastProcessedTime``
+``ICatalogQueue.__len__()``
+  Return the number of pending cataloging events.
+
+``ICatalogQueue.lastProcessedTime``
   Time of the last successful processing.
 
-``totalProcessed``
+``ICatalogQueue.totalProcessed``
   Total number of cataloging events processed.
 
 

Modified: zc.catalogqueue/trunk/src/zc/catalogqueue/CatalogEventQueue.py
===================================================================
--- zc.catalogqueue/trunk/src/zc/catalogqueue/CatalogEventQueue.py	2008-05-09 17:44:03 UTC (rev 86575)
+++ zc.catalogqueue/trunk/src/zc/catalogqueue/CatalogEventQueue.py	2008-05-09 17:58:50 UTC (rev 86576)
@@ -133,6 +133,7 @@
         data = self._data
         current = data.get(uid)
         if current is not None:
+            delta = 0
             generation, current = current
             if current in ADDED_EVENTS and etype is ADDED:
                 raise TypeError("Attempt to add an object that is already "
@@ -146,11 +147,13 @@
                 etype = CHANGED_ADDED
                 
         else:
+            delta = 1
             generation = 0
 
         data[uid] = generation+1, etype
 
         self._p_changed = 1
+        return delta
 
     def getEvent(self, uid):
         state = self._data.get(uid)

Modified: zc.catalogqueue/trunk/src/zc/catalogqueue/interfaces.py
===================================================================
--- zc.catalogqueue/trunk/src/zc/catalogqueue/interfaces.py	2008-05-09 17:44:03 UTC (rev 86575)
+++ zc.catalogqueue/trunk/src/zc/catalogqueue/interfaces.py	2008-05-09 17:58:50 UTC (rev 86576)
@@ -37,6 +37,9 @@
         zope.index.interfaces.IInjection objects to be updated.
         """
 
+    def __len__():
+        """Return the number of unprocessed cataloging events."""
+
     lastProcessedTime = zope.interface.Attribute(
         """Time the queue was last processed.
 

Modified: zc.catalogqueue/trunk/src/zc/catalogqueue/queue.py
===================================================================
--- zc.catalogqueue/trunk/src/zc/catalogqueue/queue.py	2008-05-09 17:44:03 UTC (rev 86575)
+++ zc.catalogqueue/trunk/src/zc/catalogqueue/queue.py	2008-05-09 17:58:50 UTC (rev 86576)
@@ -14,6 +14,7 @@
 
 from zc.catalogqueue.CatalogEventQueue import REMOVED, CHANGED, ADDED
 
+import BTrees.Length
 import datetime
 import logging
 import persistent
@@ -35,13 +36,18 @@
     _buckets = 1009 # Maybe configurable someday
 
     def __init__(self):
+        self._length = BTrees.Length.Length()
         self._queues = [
             zc.catalogqueue.CatalogEventQueue.CatalogEventQueue()
             for i in range(self._buckets)
             ]
 
+    def __len__(self):
+        return self._length()
+
     def _notify(self, id, event):
-        self._queues[hash(id) % self._buckets].update(id, event)
+        self._length.change(
+            self._queues[hash(id) % self._buckets].update(id, event))
 
     def add(self, id):
         self._notify(id, ADDED)
@@ -67,6 +73,7 @@
                         for catalog in catalogs:
                             catalog.index_doc(id, ob)
                 done += 1
+                self._length.change(-1)
 
             if done >= limit:
                 break

Modified: zc.catalogqueue/trunk/src/zc/catalogqueue/queue.txt
===================================================================
--- zc.catalogqueue/trunk/src/zc/catalogqueue/queue.txt	2008-05-09 17:44:03 UTC (rev 86575)
+++ zc.catalogqueue/trunk/src/zc/catalogqueue/queue.txt	2008-05-09 17:58:50 UTC (rev 86576)
@@ -25,6 +25,12 @@
     >>> queue.totalProcessed
     0
 
+The length of the queue provides access to the number of pending cataloging
+events:
+
+    >>> len(queue)
+    0
+
 Queues are used in 2 ways.  As content are modified, we call add,
 update, and remove methods on the queue:
 
@@ -56,12 +62,15 @@
     >>> queue.update(0)
 
 At this point, we've added several events, but haven't processed the queue, so
-we expect the ``lastProcessedTime`` and ``totalProcessed`` to be unchanged:
+we expect ``lastProcessedTime``, ``totalProcessed`` to be unchanged, but the
+queue length to reflect the pending tasks:
 
     >>> print queue.lastProcessedTime
     None
     >>> queue.totalProcessed
     0
+    >>> len(queue)
+    6
 
 Periodically, we call process on the queue.  We need to pass an ids
 object and a collection of injection (catalog) objects:
@@ -114,6 +123,11 @@
 
     >>> previous_time = queue.lastProcessedTime
 
+The length of the queue now indicates that no further events are pending:
+
+    >>> len(queue)
+    0
+
 If we process the queue without additional events, we'll just get 0
 back:
 
@@ -129,12 +143,15 @@
     >>> queue.totalProcessed
     6
 
-    >>> previous_time = queue.lastProcessedTime
+    >>> len(queue)
+    0
 
 Of course, the limit argument limits how many events we process:
 
     >>> for i in range(10):
     ...     queue.update(i)
+    >>> len(queue)
+    10
     >>> queue.process(Ids(), [Injection('cat1')], 5)
     cat1 indexing 1 object 1
     cat1 indexing 2 object 2
@@ -143,6 +160,8 @@
     5
     >>> queue.totalProcessed
     11
+    >>> len(queue)
+    5
 
     >>> queue.process(Ids(), [Injection('cat1')], 5)
     cat1 indexing 5 object 5
@@ -153,6 +172,8 @@
     5
     >>> queue.totalProcessed
     16
+    >>> len(queue)
+    0
 
 (Remember that 0 isn't processed because it can't be found.)
 



More information about the Checkins mailing list