[Checkins] SVN: zc.catalogqueue/trunk/src/zc/catalogqueue/ track
the number of pending cataloging events
Fred L. Drake, Jr.
fdrake at gmail.com
Fri May 9 13:58:51 EDT 2008
Log message for revision 86576:
track the number of pending cataloging events
Changed:
U zc.catalogqueue/trunk/src/zc/catalogqueue/CHANGES.txt
U zc.catalogqueue/trunk/src/zc/catalogqueue/CatalogEventQueue.py
U zc.catalogqueue/trunk/src/zc/catalogqueue/interfaces.py
U zc.catalogqueue/trunk/src/zc/catalogqueue/queue.py
U zc.catalogqueue/trunk/src/zc/catalogqueue/queue.txt
-=-
Modified: zc.catalogqueue/trunk/src/zc/catalogqueue/CHANGES.txt
===================================================================
--- zc.catalogqueue/trunk/src/zc/catalogqueue/CHANGES.txt 2008-05-09 17:44:03 UTC (rev 86575)
+++ zc.catalogqueue/trunk/src/zc/catalogqueue/CHANGES.txt 2008-05-09 17:58:50 UTC (rev 86576)
@@ -6,10 +6,13 @@
Added some processing state information as attributes of the queue object:
-``lastProcessedTime``
+``ICatalogQueue.__len__()``
+ Return the number of pending cataloging events.
+
+``ICatalogQueue.lastProcessedTime``
Time of the last successful processing.
-``totalProcessed``
+``ICatalogQueue.totalProcessed``
Total number of cataloging events processed.
Modified: zc.catalogqueue/trunk/src/zc/catalogqueue/CatalogEventQueue.py
===================================================================
--- zc.catalogqueue/trunk/src/zc/catalogqueue/CatalogEventQueue.py 2008-05-09 17:44:03 UTC (rev 86575)
+++ zc.catalogqueue/trunk/src/zc/catalogqueue/CatalogEventQueue.py 2008-05-09 17:58:50 UTC (rev 86576)
@@ -133,6 +133,7 @@
data = self._data
current = data.get(uid)
if current is not None:
+ delta = 0
generation, current = current
if current in ADDED_EVENTS and etype is ADDED:
raise TypeError("Attempt to add an object that is already "
@@ -146,11 +147,13 @@
etype = CHANGED_ADDED
else:
+ delta = 1
generation = 0
data[uid] = generation+1, etype
self._p_changed = 1
+ return delta
def getEvent(self, uid):
state = self._data.get(uid)
Modified: zc.catalogqueue/trunk/src/zc/catalogqueue/interfaces.py
===================================================================
--- zc.catalogqueue/trunk/src/zc/catalogqueue/interfaces.py 2008-05-09 17:44:03 UTC (rev 86575)
+++ zc.catalogqueue/trunk/src/zc/catalogqueue/interfaces.py 2008-05-09 17:58:50 UTC (rev 86576)
@@ -37,6 +37,9 @@
zope.index.interfaces.IInjection objects to be updated.
"""
+ def __len__():
+ """Return the number of unprocessed cataloging events."""
+
lastProcessedTime = zope.interface.Attribute(
"""Time the queue was last processed.
Modified: zc.catalogqueue/trunk/src/zc/catalogqueue/queue.py
===================================================================
--- zc.catalogqueue/trunk/src/zc/catalogqueue/queue.py 2008-05-09 17:44:03 UTC (rev 86575)
+++ zc.catalogqueue/trunk/src/zc/catalogqueue/queue.py 2008-05-09 17:58:50 UTC (rev 86576)
@@ -14,6 +14,7 @@
from zc.catalogqueue.CatalogEventQueue import REMOVED, CHANGED, ADDED
+import BTrees.Length
import datetime
import logging
import persistent
@@ -35,13 +36,18 @@
_buckets = 1009 # Maybe configurable someday
def __init__(self):
+ self._length = BTrees.Length.Length()
self._queues = [
zc.catalogqueue.CatalogEventQueue.CatalogEventQueue()
for i in range(self._buckets)
]
+ def __len__(self):
+ return self._length()
+
def _notify(self, id, event):
- self._queues[hash(id) % self._buckets].update(id, event)
+ self._length.change(
+ self._queues[hash(id) % self._buckets].update(id, event))
def add(self, id):
self._notify(id, ADDED)
@@ -67,6 +73,7 @@
for catalog in catalogs:
catalog.index_doc(id, ob)
done += 1
+ self._length.change(-1)
if done >= limit:
break
Modified: zc.catalogqueue/trunk/src/zc/catalogqueue/queue.txt
===================================================================
--- zc.catalogqueue/trunk/src/zc/catalogqueue/queue.txt 2008-05-09 17:44:03 UTC (rev 86575)
+++ zc.catalogqueue/trunk/src/zc/catalogqueue/queue.txt 2008-05-09 17:58:50 UTC (rev 86576)
@@ -25,6 +25,12 @@
>>> queue.totalProcessed
0
+The length of the queue provides access to the number of pending cataloging
+events:
+
+ >>> len(queue)
+ 0
+
Queues are used in 2 ways. As content are modified, we call add,
update, and remove methods on the queue:
@@ -56,12 +62,15 @@
>>> queue.update(0)
At this point, we've added several events, but haven't processed the queue, so
-we expect the ``lastProcessedTime`` and ``totalProcessed`` to be unchanged:
+we expect ``lastProcessedTime``, ``totalProcessed`` to be unchanged, but the
+queue length to reflect the pending tasks:
>>> print queue.lastProcessedTime
None
>>> queue.totalProcessed
0
+ >>> len(queue)
+ 6
Periodically, we call process on the queue. We need to pass an ids
object and a collection of injection (catalog) objects:
@@ -114,6 +123,11 @@
>>> previous_time = queue.lastProcessedTime
+The length of the queue now indicates that no further events are pending:
+
+ >>> len(queue)
+ 0
+
If we process the queue without additional events, we'll just get 0
back:
@@ -129,12 +143,15 @@
>>> queue.totalProcessed
6
- >>> previous_time = queue.lastProcessedTime
+ >>> len(queue)
+ 0
Of course, the limit argument limits how many events we process:
>>> for i in range(10):
... queue.update(i)
+ >>> len(queue)
+ 10
>>> queue.process(Ids(), [Injection('cat1')], 5)
cat1 indexing 1 object 1
cat1 indexing 2 object 2
@@ -143,6 +160,8 @@
5
>>> queue.totalProcessed
11
+ >>> len(queue)
+ 5
>>> queue.process(Ids(), [Injection('cat1')], 5)
cat1 indexing 5 object 5
@@ -153,6 +172,8 @@
5
>>> queue.totalProcessed
16
+ >>> len(queue)
+ 0
(Remember that 0 isn't processed because it can't be found.)
More information about the Checkins
mailing list