[Checkins] SVN: zc.queue/trunk/src/zc/queue/ Add in queue
implementation and tests
Gary Poster
gary at zope.com
Wed May 3 11:08:00 EDT 2006
Log message for revision 67933:
Add in queue implementation and tests
Changed:
A zc.queue/trunk/src/zc/queue/__init__.py
A zc.queue/trunk/src/zc/queue/_queue.py
A zc.queue/trunk/src/zc/queue/interfaces.py
A zc.queue/trunk/src/zc/queue/queue.txt
A zc.queue/trunk/src/zc/queue/tests.py
-=-
Added: zc.queue/trunk/src/zc/queue/__init__.py
===================================================================
--- zc.queue/trunk/src/zc/queue/__init__.py 2006-05-03 14:36:10 UTC (rev 67932)
+++ zc.queue/trunk/src/zc/queue/__init__.py 2006-05-03 15:07:59 UTC (rev 67933)
@@ -0,0 +1,2 @@
+from zc.queue._queue import PersistentQueue, CompositePersistentQueue
+
Added: zc.queue/trunk/src/zc/queue/_queue.py
===================================================================
--- zc.queue/trunk/src/zc/queue/_queue.py 2006-05-03 14:36:10 UTC (rev 67932)
+++ zc.queue/trunk/src/zc/queue/_queue.py 2006-05-03 15:07:59 UTC (rev 67933)
@@ -0,0 +1,173 @@
+
+from persistent import Persistent
+from ZODB.POSException import ConflictError
+from zope import interface
+
+from zc.queue import interfaces
+
+class PersistentQueue(Persistent):
+
+ interface.implements(interfaces.IQueue)
+
+ def __init__(self):
+ self._data = ()
+
+ def pull(self, index=0):
+ if index < 0:
+ len_self = len(self._data)
+ index += len_self
+ if index < 0:
+ raise IndexError(index-len_self)
+ res = self._data[index]
+ self._data = self._data[:index] + self._data[index+1:]
+ return res
+
+ def put(self, item):
+ self._data += (item,)
+
+ def __len__(self):
+ return len(self._data)
+
+ def __iter__(self):
+ return iter(self._data)
+
+ def __getitem__(self, index):
+ return self._data[index] # works with passing a slice too
+
+ def __nonzero__(self):
+ return bool(self._data)
+
+ def _p_resolveConflict(self, oldstate, committedstate, newstate):
+ return resolveQueueConflict(oldstate, committedstate, newstate)
+
+def resolveQueueConflict(oldstate, committedstate, newstate):
+ # we only know how to merge _data. If anything else is different,
+ # puke.
+ if set(committedstate.keys()) != set(newstate.keys()):
+ raise ConflictError
+ for key, val in newstate.items():
+ if key != '_data' and val != committedstate[key]:
+ raise ConflictError
+ # basically, we are ok with anything--willing to merge--
+ # unless committedstate and newstate have one or more of the
+ # same *deletions* from the oldstate.
+ old = oldstate['_data']
+ committed = committedstate['_data']
+ new = newstate['_data']
+
+ old_set = set(old)
+ committed_set = set(committed)
+ new_set = set(new)
+
+ committed_added = committed_set - old_set
+ committed_removed = old_set - committed_set
+ new_added = new_set - old_set
+ new_removed = old_set - new_set
+
+ if new_removed & committed_removed:
+ # they both removed (claimed) the same one. Puke.
+ raise ConflictError
+ elif new_added & committed_added:
+ # they both added the same one. Puke.
+ raise ConflictError
+ # Now we do the merge. We'll merge into the committed state and
+ # return it.
+ mod_committed = []
+ for v in committed:
+ if v not in new_removed:
+ mod_committed.append(v)
+ if new_added:
+ ordered_new_added = new[-len(new_added):]
+ assert set(ordered_new_added) == new_added
+ mod_committed.extend(ordered_new_added)
+ committedstate['_data'] = tuple(mod_committed)
+ return committedstate
+
+class CompositePersistentQueue(Persistent):
+ """Appropriate for queues that may become large"""
+
+ interface.implements(interfaces.IQueue)
+
+ def __init__(self, compositeSize=15, subfactory=PersistentQueue):
+ # the compositeSize value is a ballpark. Because of the merging
+ # policy, a composite queue might get as big as 2n under unusual
+ # circumstances.
+ self.subfactory = subfactory
+ self._data = ()
+ self.compositeSize = compositeSize
+
+ def __nonzero__(self):
+ return bool(self._data)
+
+ def pull(self, index=0):
+ ct = 0
+ if index < 0:
+ len_self = len(self)
+ rindex = index + len_self # not efficient, but quick and easy
+ if rindex < 0:
+ raise IndexError(index)
+ else:
+ rindex = index
+ for cix, q in enumerate(self._data):
+ for ix, item in enumerate(q):
+ if rindex == ct:
+ q.pull(ix)
+ # take this opportunity to weed out empty
+ # composite queues that may have been introduced
+ # by conflict resolution merges or by this pull.
+ self._data = tuple(q for q in self._data if q)
+ return item
+ ct += 1
+ raise IndexError(index)
+
+ def put(self, item):
+ if not self._data:
+ self._data = (self.subfactory(),)
+ last = self._data[-1]
+ if len(last) >= self.compositeSize:
+ last = self.subfactory()
+ self._data += (last,)
+ last.put(item)
+
+ def __len__(self):
+ res = 0
+ for q in self._data:
+ res += len(q)
+ return res
+
+ def __iter__(self):
+ for q in self._data:
+ for i in q:
+ yield i
+
+ def __getitem__(self, index):
+ if isinstance(index, slice):
+ start, stop, stride = slice.indices(len(self))
+ res = []
+ stride_ct = 1
+ for ix, v in enumerate(self):
+ if ix >= stop:
+ break
+ if ix < start:
+ continue
+ stride_ct -= 1
+ if stride_ct == 0:
+ res.append(v)
+ stride_ct = stride
+ return res
+ else:
+ if index < 0: # not efficient, but quick and easy
+ len_self = len(self)
+ rindex = index + len_self
+ if rindex < 0:
+ raise IndexError(index)
+ else:
+ rindex = index
+ for ix, v in enumerate(self):
+ if ix == rindex:
+ return v
+ raise IndexError(index)
+
+ def _p_resolveConflict(self, oldstate, committedstate, newstate):
+ return resolveQueueConflict(oldstate, committedstate, newstate)
+
Added: zc.queue/trunk/src/zc/queue/interfaces.py
===================================================================
--- zc.queue/trunk/src/zc/queue/interfaces.py 2006-05-03 14:36:10 UTC (rev 67932)
+++ zc.queue/trunk/src/zc/queue/interfaces.py 2006-05-03 15:07:59 UTC (rev 67933)
@@ -0,0 +1,26 @@
+from persistent.interfaces import IPersistent
+
+class IQueue(IPersistent):
+ def put(item):
+ """Put an item on the end of the queue.
+
+ Item must be persistable (picklable)."""
+
+ def pull(index=0):
+ """Remove and return an item, by default from the front of the queue.
+
+ Raise IndexError if index does not exist.
+ """
+
+ def __len__():
+ """Return len of queue"""
+
+ def __iter__():
+ """Iterate over contents of queue"""
+
+ def __getitem__(index):
+ """return item at index, or slice"""
+
+ def __nonzero__():
+ """return True if the queue contains more than zero items, else False.
+ """
Added: zc.queue/trunk/src/zc/queue/queue.txt
===================================================================
--- zc.queue/trunk/src/zc/queue/queue.txt 2006-05-03 14:36:10 UTC (rev 67932)
+++ zc.queue/trunk/src/zc/queue/queue.txt 2006-05-03 15:07:59 UTC (rev 67933)
@@ -0,0 +1,388 @@
+=================
+Persistent Queues
+=================
+
+Persistent queues are simply queues that are optimized for persistency via the
+ZODB. They assume that the ZODB is using MVCC to avoid read conflicts. They
+attempt to resolve write conflicts so that transactions that add and remove
+objects simultaneously are merged, unless the transactions are trying to
+remove the same value from the queue.
+
+An important characteristic of these queues is that they do not expect to
+hold more than one reference to any given equivalent item at a time. For
+instance, some of the conflict resolution features will not perform
+desirably if it is reasonable for your application to hold two copies of the
+string "hello" within the same queue at once.
+
+The module provides two flavors: a simple persistent queue that keeps all
+contained objects in one persistent object (`PersistentQueue`), and a
+persistent queue that divides up its contents into multiple composite
+elements (`CompositePersistentQueue`). They should be equivalent in terms of
+API and so are mostly examined in the abstract in this document: we'll generate
+instances with a representative `Queue` factory, that could be either class.
+They only differ in an aspect of their write conflict resolution behavior,
+which is discussed below.
+
+Queues can be instantiated with no arguments.
+
+ >>> q = Queue()
+ >>> from zc.queue.interfaces import IQueue
+ >>> from zope.interface.verify import verifyObject
+ >>> verifyObject(IQueue, q)
+ True
+
+The basic API is simple: use `put` to add items to the back of the queue, and
+`pull` to pull things off the queue, defaulting to the front of the queue.
+
+ >>> q.put(1)
+ >>> q.put(2)
+ >>> q.pull()
+ 1
+ >>> q.put(3)
+ >>> q.pull()
+ 2
+ >>> q.pull()
+ 3
+
+The `pull` method takes an optional zero-based index argument, and can accept
+negative values.
+
+ >>> q.put(4)
+ >>> q.put(5)
+ >>> q.put(6)
+ >>> q.pull(-1)
+ 6
+ >>> q.pull(1)
+ 5
+ >>> q.pull(0)
+ 4
+
+Requesting an item from an empty queue raises an IndexError.
+
+ >>> q.pull() # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ IndexError: ...
+
+Requesting an invalid index value does the same.
+
+ >>> q.put(7)
+ >>> q.put(8)
+ >>> q.pull(2) # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ IndexError: ...
+
+Beyond these core queue operations, queues support len...
+
+ >>> len(q)
+ 2
+ >>> q.pull()
+ 7
+ >>> len(q)
+ 1
+ >>> q.pull()
+ 8
+ >>> len(q)
+ 0
+
+...iter (which does *not* empty the queue)...
+
+ >>> iter(q).next()
+ Traceback (most recent call last):
+ ...
+ StopIteration
+ >>> q.put(9)
+ >>> q.put(10)
+ >>> q.put(11)
+ >>> iter(q).next()
+ 9
+ >>> [i for i in q]
+ [9, 10, 11]
+ >>> q.pull()
+ 9
+ >>> [i for i in q]
+ [10, 11]
+
+...bool...
+
+ >>> bool(q)
+ True
+ >>> q.pull()
+ 10
+ >>> q.pull()
+ 11
+ >>> bool(q)
+ False
+
+...and list-like bracket access (which again does *not* empty the queue).
+
+ >>> q.put(12)
+ >>> q[0]
+ 12
+ >>> q.pull()
+ 12
+ >>> q[0] # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ IndexError: ...
+ >>> for i in range (13, 23):
+ ... q.put(i)
+ ...
+ >>> q[0]
+ 13
+ >>> q[1]
+ 14
+ >>> q[2]
+ 15
+ >>> q[-1]
+ 22
+ >>> q[-10]
+ 13
+
+That's it--there's no additional way to add anything beyond `put`, and no
+additional way to remove anything beyond `pull`.
+
+The only other wrinkle is the conflict resolution code. To show this, we
+will have to have two copies of the same queue, from two different connections.
+
+NOTE: this testing approach has known weaknesses. See discussion in tests.py.
+
+ >>> import transaction
+ >>> from zc.queue.tests import ConflictResolvingMappingStorage
+ >>> from ZODB import DB
+ >>> db = DB(ConflictResolvingMappingStorage('test'))
+ >>> transactionmanager_1 = transaction.TransactionManager()
+ >>> transactionmanager_2 = transaction.TransactionManager()
+ >>> connection_1 = db.open(transaction_manager=transactionmanager_1)
+ >>> root_1 = connection_1.root()
+
+ >>> q = Queue()
+ >>> q.__name__ = "queue"
+ >>> root_1["queue"] = q
+ >>> del q
+ >>> transactionmanager_1.commit()
+ >>> q_1 = root_1['queue']
+
+ >>> transactionmanager_2 = transaction.TransactionManager()
+ >>> connection_2 = db.open(transaction_manager=transactionmanager_2)
+ >>> root_2 = connection_2.root()
+ >>> q_2 = root_2['queue']
+
+Now we have two copies of the same queue, with separate transaction managers
+and connections about the same way two threads would have them. The '_1'
+suffix identifies the objects for user 1, in thread 1; and the '_2' suffix
+identifies the objects for user 2, in a concurrent thread 2.
+
+First, let's have the two users add some items to the queue concurrently.
+For concurrent commits of only putting a single new item (one each in two
+transactions), in both types of queue the user who commits first gets the
+lower position in the queue--that is, the position that will leave the queue
+sooner using default `pull` calls.
+
+In this example, even though q_1 is modified first, q_2's transaction is
+committed first, so q_2's addition is first after the merge.
+
+ >>> q_1.put(1001)
+ >>> q_2.put(1000)
+ >>> transactionmanager_2.commit()
+ >>> transactionmanager_1.commit()
+ >>> connection_1.sync()
+ >>> connection_2.sync()
+ >>> list(q_1)
+ [1000, 1001]
+ >>> list(q_2)
+ [1000, 1001]
+
+For commits of more than one additions per connection of two, or of more than
+two concurrent adding transactions, the behavior is the same for the
+PersistentQueue: the first commit's additions will go before the second
+commit's.
+
+ >>> from zc import queue
+ >>> if isinstance(q_1, queue.PersistentQueue):
+ ... for i in range(5):
+ ... q_1.put(i)
+ ... for i in range(1002, 1005):
+ ... q_2.put(i)
+ ... transactionmanager_2.commit()
+ ... transactionmanager_1.commit()
+ ... connection_1.sync()
+ ... connection_2.sync()
+ ...
+
+As we'll see below, that will again reliably put all the values from the first
+commit earlier in the queue, to result in
+[1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4].
+
+For the CompositePersistentQueue, the behavior is different. The order
+will be maintained with a set of additions in a transaction, but the values
+may be merged between the two transactions' additions. We will compensate
+for that here to get a reliable queue state.
+
+ >>> if isinstance(q_1, queue.CompositePersistentQueue):
+ ... for i1, i2 in ((1002, 1003), (1004, 0), (1, 2), (3, 4)):
+ ... q_1.put(i1)
+ ... q_2.put(i2)
+ ... transactionmanager_1.commit()
+ ... transactionmanager_2.commit()
+ ... connection_1.sync()
+ ... connection_2.sync()
+ ...
+
+Whichever kind of queue we have, we now have the following values.
+
+ >>> list(q_1)
+ [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]
+ >>> list(q_2)
+ [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]
+
+If two users try to add the same item, then a conflict error is raised.
+
+ >>> q_1.put(5)
+ >>> q_2.put(5)
+ >>> transactionmanager_1.commit()
+ >>> transactionmanager_2.commit() # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ ConflictError: ...
+ >>> transactionmanager_2.abort()
+ >>> connection_1.sync()
+ >>> connection_2.sync()
+ >>> list(q_1)
+ [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4, 5]
+ >>> list(q_2)
+ [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4, 5]
+
+Users can also concurrently remove items from a queue...
+
+ >>> q_1.pull()
+ 1000
+ >>> q_1[0]
+ 1001
+
+ >>> q_2.pull(5)
+ 0
+ >>> q_2[5]
+ 1
+
+ >>> q_2[0] # 1000 value still there in this connection
+ 1000
+
+ >>> q_1[4] # 0 value still there in this connection.
+ 0
+
+ >>> transactionmanager_1.commit()
+ >>> transactionmanager_2.commit()
+ >>> connection_1.sync()
+ >>> connection_2.sync()
+ >>> list(q_1)
+ [1001, 1002, 1003, 1004, 1, 2, 3, 4, 5]
+ >>> list(q_2)
+ [1001, 1002, 1003, 1004, 1, 2, 3, 4, 5]
+
+...as long as they don't remove the same item.
+
+ >>> q_1.pull()
+ 1001
+ >>> q_2.pull()
+ 1001
+ >>> transactionmanager_1.commit()
+ >>> transactionmanager_2.commit() # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ ConflictError: ...
+ >>> transactionmanager_2.abort()
+ >>> connection_1.sync()
+ >>> connection_2.sync()
+ >>> list(q_1)
+ [1002, 1003, 1004, 1, 2, 3, 4, 5]
+ >>> list(q_2)
+ [1002, 1003, 1004, 1, 2, 3, 4, 5]
+
+Also importantly, users can concurrently remove and add items to a queue.
+
+ >>> q_1.pull()
+ 1002
+ >>> q_1.pull()
+ 1003
+ >>> q_1.pull()
+ 1004
+ >>> q_2.put(6)
+ >>> q_2.put(7)
+ >>> transactionmanager_1.commit()
+ >>> transactionmanager_2.commit()
+ >>> connection_1.sync()
+ >>> connection_2.sync()
+ >>> list(q_1)
+ [1, 2, 3, 4, 5, 6, 7]
+ >>> list(q_2)
+ [1, 2, 3, 4, 5, 6, 7]
+
+As a final example, we'll show the conflict resolution code under extreme
+duress, with multiple simultaneous puts and pulls.
+
+ >>> res_1 = []
+ >>> for i in range(6, -1, -2):
+ ... res_1.append(q_1.pull(i))
+ ...
+ >>> res_1
+ [7, 5, 3, 1]
+ >>> res_2 = []
+ >>> for i in range(5, 0, -2):
+ ... res_2.append(q_2.pull(i))
+ ...
+ >>> res_2
+ [6, 4, 2]
+ >>> for i in range(8, 12):
+ ... q_1.put(i)
+ ...
+ >>> for i in range(12, 16):
+ ... q_2.put(i)
+ ...
+ >>> list(q_1)
+ [2, 4, 6, 8, 9, 10, 11]
+ >>> list(q_2)
+ [1, 3, 5, 7, 12, 13, 14, 15]
+ >>> transactionmanager_1.commit()
+ >>> transactionmanager_2.commit()
+ >>> connection_1.sync()
+ >>> connection_2.sync()
+
+After these commits, if the queue is a PersistentQueue, the new values are
+in the order of their commit. However, as discussed above, if the queue is
+a CompositePersistentQueue the behavior is different. While the order will be
+maintained comparitively--so if object `A` is ahead of object `B` in the queue
+on commit then `A` will still be ahead of `B` after a merge of the conflicting
+transactions--values may be interspersed between the two transactions.
+
+For instance, if our example queue were a PersistentQueue, the values would
+be [8, 9, 10, 11, 12, 13, 14, 15]. However, if it were a
+CompositePersistentQueue, the values might be the same, or might be any
+combination in which [8, 9, 10, 11] and [12, 13, 14, 15], from the two
+transactions, are still in order. One ordering might be
+[8, 9, 12, 13, 10, 11, 14, 15], for instance.
+
+ >>> if isinstance(q_1, queue.PersistentQueue):
+ ... res_1 = list(q_1)
+ ... res_2 = list(q_2)
+ ... elif isinstance(q_1, queue.CompositePersistentQueue):
+ ... firstsrc_1 = list(q_1)
+ ... firstsrc_2 = list(q_2)
+ ... secondsrc_1 = firstsrc_1[:]
+ ... secondsrc_2 = firstsrc_2[:]
+ ... for val in [12, 13, 14, 15]:
+ ... firstsrc_1.remove(val)
+ ... firstsrc_2.remove(val)
+ ... for val in [8, 9, 10, 11]:
+ ... secondsrc_1.remove(val)
+ ... secondsrc_2.remove(val)
+ ... res_1 = firstsrc_1 + secondsrc_1
+ ... res_2 = firstsrc_2 + secondsrc_2
+ ...
+ >>> res_1
+ [8, 9, 10, 11, 12, 13, 14, 15]
+ >>> res_2
+ [8, 9, 10, 11, 12, 13, 14, 15]
+
+ >>> db.close() # cleanup
Property changes on: zc.queue/trunk/src/zc/queue/queue.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.queue/trunk/src/zc/queue/tests.py
===================================================================
--- zc.queue/trunk/src/zc/queue/tests.py 2006-05-03 14:36:10 UTC (rev 67932)
+++ zc.queue/trunk/src/zc/queue/tests.py 2006-05-03 15:07:59 UTC (rev 67933)
@@ -0,0 +1,79 @@
+import unittest
+from zope.testing import doctest, module
+from ZODB import ConflictResolution, MappingStorage, POSException
+
+from zc import queue
+
+# TODO: this approach is useful, but fragile. It also puts a dependency in
+# this package on the ZODB, when otherwise it would only depend on persistent.
+#
+# Other discussed testing approaches include passing values explicitly to the
+# queue's conflict resolution code, and having a simple database implemented in
+# the persistent package.
+#
+# This approach is arguably useful in two ways. First, it confirms that the
+# conflict resolution code performs as expected in the desired environment,
+# the ZODB. Second, in the doctest it shows real examples of the queue usage,
+# with transaction managers and all: this gives a clearer picture of the
+# full context in which this conflict resolution code must dance.
+
+class ConflictResolvingMappingStorage(
+ MappingStorage.MappingStorage,
+ ConflictResolution.ConflictResolvingStorage):
+
+ def __init__(self, name='ConflictResolvingMappingStorage'):
+ MappingStorage.MappingStorage.__init__(self, name)
+ self._old = {}
+
+ def loadSerial(self, oid, serial):
+ self._lock_acquire()
+ try:
+ old_info = self._old[oid]
+ try:
+ return old_info[serial]
+ except KeyError:
+ raise POSException.POSKeyError(oid)
+ finally:
+ self._lock_release()
+
+ def store(self, oid, serial, data, version, transaction):
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+
+ if version:
+ raise POSException.Unsupported("Versions aren't supported")
+
+ self._lock_acquire()
+ try:
+ if oid in self._index:
+ oserial = self._index[oid][:8]
+ if serial != oserial:
+ rdata = self.tryToResolveConflict(
+ oid, oserial, serial, data)
+ if rdata is None:
+ raise POSException.ConflictError(
+ oid=oid, serials=(oserial, serial), data=data)
+ else:
+ data = rdata
+ self._tindex[oid] = self._tid + data
+ finally:
+ self._lock_release()
+ return self._tid
+
+ def _finish(self, tid, user, desc, ext):
+ self._index.update(self._tindex)
+ self._ltid = self._tid
+ for oid, record in self._tindex.items():
+ self._old.setdefault(oid, {})[self._tid] = record[8:]
+
+def test_suite():
+ return unittest.TestSuite((
+ doctest.DocFileSuite(
+ 'queue.txt', globs={'Queue':queue.PersistentQueue}),
+ doctest.DocFileSuite(
+ 'queue.txt',
+ globs={'Queue':lambda: queue.CompositePersistentQueue(2)}),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
More information about the Checkins
mailing list