[Checkins] SVN: zc.queue/trunk/src/zc/queue/ Add in queue implementation and tests

Gary Poster gary at zope.com
Wed May 3 11:08:00 EDT 2006


Log message for revision 67933:
  Add in queue implementation and tests
  

Changed:
  A   zc.queue/trunk/src/zc/queue/__init__.py
  A   zc.queue/trunk/src/zc/queue/_queue.py
  A   zc.queue/trunk/src/zc/queue/interfaces.py
  A   zc.queue/trunk/src/zc/queue/queue.txt
  A   zc.queue/trunk/src/zc/queue/tests.py

-=-
Added: zc.queue/trunk/src/zc/queue/__init__.py
===================================================================
--- zc.queue/trunk/src/zc/queue/__init__.py	2006-05-03 14:36:10 UTC (rev 67932)
+++ zc.queue/trunk/src/zc/queue/__init__.py	2006-05-03 15:07:59 UTC (rev 67933)
@@ -0,0 +1,2 @@
+from zc.queue._queue import PersistentQueue, CompositePersistentQueue
+

Added: zc.queue/trunk/src/zc/queue/_queue.py
===================================================================
--- zc.queue/trunk/src/zc/queue/_queue.py	2006-05-03 14:36:10 UTC (rev 67932)
+++ zc.queue/trunk/src/zc/queue/_queue.py	2006-05-03 15:07:59 UTC (rev 67933)
@@ -0,0 +1,173 @@
+
+from persistent import Persistent
+from ZODB.POSException import ConflictError
+from zope import interface
+
+from zc.queue import interfaces
+
+class PersistentQueue(Persistent):
+
+    interface.implements(interfaces.IQueue)
+
+    def __init__(self):
+        self._data = ()
+
+    def pull(self, index=0):
+        if index < 0:
+            len_self = len(self._data)
+            index += len_self
+            if index < 0:
+                raise IndexError(index-len_self)
+        res = self._data[index]
+        self._data = self._data[:index] + self._data[index+1:]
+        return res
+
+    def put(self, item):
+        self._data += (item,)
+
+    def __len__(self):
+        return len(self._data)
+
+    def __iter__(self):
+        return iter(self._data)
+
+    def __getitem__(self, index):
+        return self._data[index] # works with passing a slice too
+
+    def __nonzero__(self):
+        return bool(self._data)
+
+    def _p_resolveConflict(self, oldstate, committedstate, newstate):
+        return resolveQueueConflict(oldstate, committedstate, newstate)
+
+def resolveQueueConflict(oldstate, committedstate, newstate):
+    # we only know how to merge _data.  If anything else is different,
+    # puke.
+    if set(committedstate.keys()) != set(newstate.keys()):
+        raise ConflictError
+    for key, val in newstate.items():
+        if key != '_data' and val != committedstate[key]:
+            raise ConflictError
+    # basically, we are ok with anything--willing to merge--
+    # unless committedstate and newstate have one or more of the
+    # same *deletions* from the oldstate.
+    old = oldstate['_data']
+    committed = committedstate['_data']
+    new = newstate['_data']
+
+    old_set = set(old)
+    committed_set = set(committed)
+    new_set = set(new)
+
+    committed_added = committed_set - old_set
+    committed_removed = old_set - committed_set
+    new_added = new_set - old_set
+    new_removed = old_set - new_set
+
+    if new_removed & committed_removed:
+        # they both removed (claimed) the same one.  Puke.
+        raise ConflictError
+    elif new_added & committed_added:
+        # they both added the same one.  Puke.
+        raise ConflictError
+    # Now we do the merge.  We'll merge into the committed state and
+    # return it.
+    mod_committed = []
+    for v in committed:
+        if v not in new_removed:
+            mod_committed.append(v)
+    if new_added:
+        ordered_new_added = new[-len(new_added):]
+        assert set(ordered_new_added) == new_added
+        mod_committed.extend(ordered_new_added)
+    committedstate['_data'] = tuple(mod_committed)
+    return committedstate
+
+class CompositePersistentQueue(Persistent):
+    """Appropriate for queues that may become large"""
+
+    interface.implements(interfaces.IQueue)
+
+    def __init__(self, compositeSize=15, subfactory=PersistentQueue):
+        # the compositeSize value is a ballpark.  Because of the merging
+        # policy, a composite queue might get as big as 2n under unusual
+        # circumstances.
+        self.subfactory = subfactory
+        self._data = ()
+        self.compositeSize = compositeSize
+
+    def __nonzero__(self):
+        return bool(self._data)
+
+    def pull(self, index=0):
+        ct = 0
+        if index < 0:
+            len_self = len(self)
+            rindex = index + len_self # not efficient, but quick and easy
+            if rindex < 0:
+                raise IndexError(index)
+        else:
+            rindex = index
+        for cix, q in enumerate(self._data):
+            for ix, item in enumerate(q):
+                if rindex == ct:
+                    q.pull(ix)
+                    # take this opportunity to weed out empty
+                    # composite queues that may have been introduced
+                    # by conflict resolution merges or by this pull.
+                    self._data = tuple(q for q in self._data if q)
+                    return item
+                ct += 1
+        raise IndexError(index)
+
+    def put(self, item):
+        if not self._data:
+            self._data = (self.subfactory(),)
+        last = self._data[-1]
+        if len(last) >= self.compositeSize:
+            last = self.subfactory()
+            self._data += (last,)
+        last.put(item)
+
+    def __len__(self):
+        res = 0
+        for q in self._data:
+            res += len(q)
+        return res
+
+    def __iter__(self):
+        for q in self._data:
+            for i in q:
+                yield i
+
+    def __getitem__(self, index):
+        if isinstance(index, slice):
+            start, stop, stride = slice.indices(len(self))
+            res = []
+            stride_ct = 1
+            for ix, v in enumerate(self):
+                if ix >= stop:
+                    break
+                if ix < start:
+                    continue
+                stride_ct -= 1
+                if stride_ct == 0:
+                    res.append(v)
+                    stride_ct = stride
+            return res
+        else:
+            if index < 0: # not efficient, but quick and easy
+                len_self = len(self)
+                rindex = index + len_self
+                if rindex < 0:
+                    raise IndexError(index)
+            else:
+                rindex = index
+            for ix, v in enumerate(self):
+                if ix == rindex:
+                    return v
+            raise IndexError(index)
+
+    def _p_resolveConflict(self, oldstate, committedstate, newstate):
+        return resolveQueueConflict(oldstate, committedstate, newstate)
+

Added: zc.queue/trunk/src/zc/queue/interfaces.py
===================================================================
--- zc.queue/trunk/src/zc/queue/interfaces.py	2006-05-03 14:36:10 UTC (rev 67932)
+++ zc.queue/trunk/src/zc/queue/interfaces.py	2006-05-03 15:07:59 UTC (rev 67933)
@@ -0,0 +1,26 @@
+from persistent.interfaces import IPersistent
+
+class IQueue(IPersistent):
+    def put(item):
+        """Put an item on the end of the queue.
+
+        Item must be persistable (picklable)."""
+
+    def pull(index=0):
+        """Remove and return an item, by default from the front of the queue.
+
+        Raise IndexError if index does not exist.
+        """
+
+    def __len__():
+        """Return len of queue"""
+
+    def __iter__():
+        """Iterate over contents of queue"""
+
+    def __getitem__(index):
+        """return item at index, or slice"""
+
+    def __nonzero__():
+        """return True if the queue contains more than zero items, else False.
+        """

Added: zc.queue/trunk/src/zc/queue/queue.txt
===================================================================
--- zc.queue/trunk/src/zc/queue/queue.txt	2006-05-03 14:36:10 UTC (rev 67932)
+++ zc.queue/trunk/src/zc/queue/queue.txt	2006-05-03 15:07:59 UTC (rev 67933)
@@ -0,0 +1,388 @@
+=================
+Persistent Queues
+=================
+
+Persistent queues are simply queues that are optimized for persistency via the
+ZODB. They assume that the ZODB is using MVCC to avoid read conflicts. They
+attempt to resolve write conflicts so that transactions that add and remove
+objects simultaneously are merged, unless the transactions are trying to
+remove the same value from the queue.
+
+An important characteristic of these queues is that they do not expect to
+hold more than one reference to any given equivalent item at a time.  For
+instance, some of the conflict resolution features will not perform
+desirably if it is reasonable for your application to hold two copies of the
+string "hello" within the same queue at once.
+
+The module provides two flavors: a simple persistent queue that keeps all
+contained objects in one persistent object (`PersistentQueue`), and a
+persistent queue that divides up its contents into multiple composite
+elements (`CompositePersistentQueue`). They should be equivalent in terms of
+API and so are mostly examined in the abstract in this document: we'll generate
+instances with a representative `Queue` factory, that could be either class.
+They only differ in an aspect of their write conflict resolution behavior,
+which is discussed below.
+
+Queues can be instantiated with no arguments.
+
+    >>> q = Queue()
+    >>> from zc.queue.interfaces import IQueue
+    >>> from zope.interface.verify import verifyObject
+    >>> verifyObject(IQueue, q)
+    True
+
+The basic API is simple: use `put` to add items to the back of the queue, and
+`pull` to pull things off the queue, defaulting to the front of the queue.
+
+    >>> q.put(1)
+    >>> q.put(2)
+    >>> q.pull()
+    1
+    >>> q.put(3)
+    >>> q.pull()
+    2
+    >>> q.pull()
+    3
+
+The `pull` method takes an optional zero-based index argument, and can accept
+negative values.
+
+    >>> q.put(4)
+    >>> q.put(5)
+    >>> q.put(6)
+    >>> q.pull(-1)
+    6
+    >>> q.pull(1)
+    5
+    >>> q.pull(0)
+    4
+
+Requesting an item from an empty queue raises an IndexError.
+
+    >>> q.pull() # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    IndexError: ...
+
+Requesting an invalid index value does the same.
+
+    >>> q.put(7)
+    >>> q.put(8)
+    >>> q.pull(2) # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    IndexError: ...
+
+Beyond these core queue operations, queues support len...
+
+    >>> len(q)
+    2
+    >>> q.pull()
+    7
+    >>> len(q)
+    1
+    >>> q.pull()
+    8
+    >>> len(q)
+    0
+
+...iter (which does *not* empty the queue)...
+
+    >>> iter(q).next()
+    Traceback (most recent call last):
+    ...
+    StopIteration
+    >>> q.put(9)
+    >>> q.put(10)
+    >>> q.put(11)
+    >>> iter(q).next()
+    9
+    >>> [i for i in q]
+    [9, 10, 11]
+    >>> q.pull()
+    9
+    >>> [i for i in q]
+    [10, 11]
+
+...bool...
+
+    >>> bool(q)
+    True
+    >>> q.pull()
+    10
+    >>> q.pull()
+    11
+    >>> bool(q)
+    False
+
+...and list-like bracket access (which again does *not* empty the queue).
+
+    >>> q.put(12)
+    >>> q[0]
+    12
+    >>> q.pull()
+    12
+    >>> q[0] # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    IndexError: ...
+    >>> for i in range (13, 23):
+    ...     q.put(i)
+    ...
+    >>> q[0]
+    13
+    >>> q[1]
+    14
+    >>> q[2]
+    15
+    >>> q[-1]
+    22
+    >>> q[-10]
+    13
+
+That's it--there's no additional way to add anything beyond `put`, and no
+additional way to remove anything beyond `pull`.
+
+The only other wrinkle is the conflict resolution code.  To show this, we
+will have to have two copies of the same queue, from two different connections.
+
+NOTE: this testing approach has known weaknesses.  See discussion in tests.py.
+
+    >>> import transaction
+    >>> from zc.queue.tests import ConflictResolvingMappingStorage
+    >>> from ZODB import DB
+    >>> db = DB(ConflictResolvingMappingStorage('test'))
+    >>> transactionmanager_1 = transaction.TransactionManager()
+    >>> transactionmanager_2 = transaction.TransactionManager()
+    >>> connection_1 = db.open(transaction_manager=transactionmanager_1)
+    >>> root_1 = connection_1.root()
+
+    >>> q = Queue()
+    >>> q.__name__ = "queue"
+    >>> root_1["queue"] = q
+    >>> del q
+    >>> transactionmanager_1.commit()
+    >>> q_1 = root_1['queue']
+
+    >>> transactionmanager_2 = transaction.TransactionManager()
+    >>> connection_2 = db.open(transaction_manager=transactionmanager_2)
+    >>> root_2 = connection_2.root()
+    >>> q_2 = root_2['queue']
+
+Now we have two copies of the same queue, with separate transaction managers
+and connections about the same way two threads would have them. The '_1'
+suffix identifies the objects for user 1, in thread 1; and the '_2' suffix
+identifies the objects for user 2, in a concurrent thread 2.
+
+First, let's have the two users add some items to the queue concurrently.
+For concurrent commits of only putting a single new item (one each in two
+transactions), in both types of queue the user who commits first gets the
+lower position in the queue--that is, the position that will leave the queue
+sooner using default `pull` calls.
+
+In this example, even though q_1 is modified first, q_2's transaction is
+committed first, so q_2's addition is first after the merge.
+
+    >>> q_1.put(1001)
+    >>> q_2.put(1000)
+    >>> transactionmanager_2.commit()
+    >>> transactionmanager_1.commit()
+    >>> connection_1.sync()
+    >>> connection_2.sync()
+    >>> list(q_1)
+    [1000, 1001]
+    >>> list(q_2)
+    [1000, 1001]
+
+For commits of more than one additions per connection of two, or of more than
+two concurrent adding transactions, the behavior is the same for the
+PersistentQueue: the first commit's additions will go before the second
+commit's.
+
+    >>> from zc import queue
+    >>> if isinstance(q_1, queue.PersistentQueue):
+    ...     for i in range(5):
+    ...         q_1.put(i)
+    ...     for i in range(1002, 1005):
+    ...         q_2.put(i)
+    ...     transactionmanager_2.commit()
+    ...     transactionmanager_1.commit()
+    ...     connection_1.sync()
+    ...     connection_2.sync()
+    ...
+
+As we'll see below, that will again reliably put all the values from the first
+commit earlier in the queue, to result in
+[1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4].
+
+For the CompositePersistentQueue, the behavior is different.  The order
+will be maintained with a set of additions in a transaction, but the values
+may be merged between the two transactions' additions.  We will compensate
+for that here to get a reliable queue state.
+
+    >>> if isinstance(q_1, queue.CompositePersistentQueue):
+    ...     for i1, i2 in ((1002, 1003), (1004, 0), (1, 2), (3, 4)):
+    ...         q_1.put(i1)
+    ...         q_2.put(i2)
+    ...         transactionmanager_1.commit()
+    ...         transactionmanager_2.commit()
+    ...         connection_1.sync()
+    ...         connection_2.sync()
+    ...
+
+Whichever kind of queue we have, we now have the following values.
+
+    >>> list(q_1)
+    [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]
+    >>> list(q_2)
+    [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4]
+
+If two users try to add the same item, then a conflict error is raised.
+
+    >>> q_1.put(5)
+    >>> q_2.put(5)
+    >>> transactionmanager_1.commit()
+    >>> transactionmanager_2.commit() # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    ConflictError: ...
+    >>> transactionmanager_2.abort()
+    >>> connection_1.sync()
+    >>> connection_2.sync()
+    >>> list(q_1)
+    [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4, 5]
+    >>> list(q_2)
+    [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4, 5]
+
+Users can also concurrently remove items from a queue...
+
+    >>> q_1.pull()
+    1000
+    >>> q_1[0]
+    1001
+
+    >>> q_2.pull(5)
+    0
+    >>> q_2[5]
+    1
+
+    >>> q_2[0] # 1000 value still there in this connection
+    1000
+
+    >>> q_1[4] # 0 value still there in this connection.
+    0
+
+    >>> transactionmanager_1.commit()
+    >>> transactionmanager_2.commit()
+    >>> connection_1.sync()
+    >>> connection_2.sync()
+    >>> list(q_1)
+    [1001, 1002, 1003, 1004, 1, 2, 3, 4, 5]
+    >>> list(q_2)
+    [1001, 1002, 1003, 1004, 1, 2, 3, 4, 5]
+
+...as long as they don't remove the same item.
+
+    >>> q_1.pull()
+    1001
+    >>> q_2.pull()
+    1001
+    >>> transactionmanager_1.commit()
+    >>> transactionmanager_2.commit() # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    ConflictError: ...
+    >>> transactionmanager_2.abort()
+    >>> connection_1.sync()
+    >>> connection_2.sync()
+    >>> list(q_1)
+    [1002, 1003, 1004, 1, 2, 3, 4, 5]
+    >>> list(q_2)
+    [1002, 1003, 1004, 1, 2, 3, 4, 5]
+
+Also importantly, users can concurrently remove and add items to a queue.
+
+    >>> q_1.pull()
+    1002
+    >>> q_1.pull()
+    1003
+    >>> q_1.pull()
+    1004
+    >>> q_2.put(6)
+    >>> q_2.put(7)
+    >>> transactionmanager_1.commit()
+    >>> transactionmanager_2.commit()
+    >>> connection_1.sync()
+    >>> connection_2.sync()
+    >>> list(q_1)
+    [1, 2, 3, 4, 5, 6, 7]
+    >>> list(q_2)
+    [1, 2, 3, 4, 5, 6, 7]
+
+As a final example, we'll show the conflict resolution code under extreme
+duress, with multiple simultaneous puts and pulls.
+
+    >>> res_1 = []
+    >>> for i in range(6, -1, -2):
+    ...     res_1.append(q_1.pull(i))
+    ...
+    >>> res_1
+    [7, 5, 3, 1]
+    >>> res_2 = []
+    >>> for i in range(5, 0, -2):
+    ...     res_2.append(q_2.pull(i))
+    ...
+    >>> res_2
+    [6, 4, 2]
+    >>> for i in range(8, 12):
+    ...     q_1.put(i)
+    ...
+    >>> for i in range(12, 16):
+    ...     q_2.put(i)
+    ...
+    >>> list(q_1)
+    [2, 4, 6, 8, 9, 10, 11]
+    >>> list(q_2)
+    [1, 3, 5, 7, 12, 13, 14, 15]
+    >>> transactionmanager_1.commit()
+    >>> transactionmanager_2.commit()
+    >>> connection_1.sync()
+    >>> connection_2.sync()
+
+After these commits, if the queue is a PersistentQueue, the new values are
+in the order of their commit.  However, as discussed above, if the queue is
+a CompositePersistentQueue the behavior is different. While the order will be
+maintained comparitively--so if object `A` is ahead of object `B` in the queue
+on commit then `A` will still be ahead of `B` after a merge of the conflicting
+transactions--values may be interspersed between the two transactions.
+
+For instance, if our example queue were a PersistentQueue, the values would
+be [8, 9, 10, 11, 12, 13, 14, 15].  However, if it were a
+CompositePersistentQueue, the values might be the same, or might be any
+combination in which [8, 9, 10, 11] and [12, 13, 14, 15], from the two
+transactions, are still in order.  One ordering might be
+[8, 9, 12, 13, 10, 11, 14, 15], for instance.
+
+    >>> if isinstance(q_1, queue.PersistentQueue):
+    ...     res_1 = list(q_1)
+    ...     res_2 = list(q_2)
+    ... elif isinstance(q_1, queue.CompositePersistentQueue):
+    ...     firstsrc_1 = list(q_1)
+    ...     firstsrc_2 = list(q_2)
+    ...     secondsrc_1 = firstsrc_1[:]
+    ...     secondsrc_2 = firstsrc_2[:]
+    ...     for val in [12, 13, 14, 15]:
+    ...         firstsrc_1.remove(val)
+    ...         firstsrc_2.remove(val)
+    ...     for val in [8, 9, 10, 11]:
+    ...         secondsrc_1.remove(val)
+    ...         secondsrc_2.remove(val)
+    ...     res_1 = firstsrc_1 + secondsrc_1
+    ...     res_2 = firstsrc_2 + secondsrc_2
+    ...
+    >>> res_1
+    [8, 9, 10, 11, 12, 13, 14, 15]
+    >>> res_2
+    [8, 9, 10, 11, 12, 13, 14, 15]
+
+    >>> db.close() # cleanup


Property changes on: zc.queue/trunk/src/zc/queue/queue.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.queue/trunk/src/zc/queue/tests.py
===================================================================
--- zc.queue/trunk/src/zc/queue/tests.py	2006-05-03 14:36:10 UTC (rev 67932)
+++ zc.queue/trunk/src/zc/queue/tests.py	2006-05-03 15:07:59 UTC (rev 67933)
@@ -0,0 +1,79 @@
+import unittest
+from zope.testing import doctest, module
+from ZODB import ConflictResolution, MappingStorage, POSException
+
+from zc import queue
+
+# TODO: this approach is useful, but fragile.  It also puts a dependency in
+# this package on the ZODB, when otherwise it would only depend on persistent.
+#
+# Other discussed testing approaches include passing values explicitly to the
+# queue's conflict resolution code, and having a simple database implemented in
+# the persistent package.
+#
+# This approach is arguably useful in two ways.  First, it confirms that the
+# conflict resolution code performs as expected in the desired environment,
+# the ZODB.  Second, in the doctest it shows real examples of the queue usage,
+# with transaction managers and all: this gives a clearer picture of the
+# full context in which this conflict resolution code must dance.
+
+class ConflictResolvingMappingStorage(
+    MappingStorage.MappingStorage,
+    ConflictResolution.ConflictResolvingStorage):
+
+    def __init__(self, name='ConflictResolvingMappingStorage'):
+        MappingStorage.MappingStorage.__init__(self, name)
+        self._old = {}
+
+    def loadSerial(self, oid, serial):
+        self._lock_acquire()
+        try:
+            old_info = self._old[oid]
+            try:
+                return old_info[serial]
+            except KeyError:
+                raise POSException.POSKeyError(oid)
+        finally:
+            self._lock_release()
+
+    def store(self, oid, serial, data, version, transaction):
+        if transaction is not self._transaction:
+            raise POSException.StorageTransactionError(self, transaction)
+
+        if version:
+            raise POSException.Unsupported("Versions aren't supported")
+
+        self._lock_acquire()
+        try:
+            if oid in self._index:
+                oserial = self._index[oid][:8]
+                if serial != oserial:
+                    rdata = self.tryToResolveConflict(
+                        oid, oserial, serial, data)
+                    if rdata is None:
+                        raise POSException.ConflictError(
+                            oid=oid, serials=(oserial, serial), data=data)
+                    else:
+                        data = rdata
+            self._tindex[oid] = self._tid + data
+        finally:
+            self._lock_release()
+        return self._tid
+
+    def _finish(self, tid, user, desc, ext):
+        self._index.update(self._tindex)
+        self._ltid = self._tid
+        for oid, record in self._tindex.items():
+            self._old.setdefault(oid, {})[self._tid] = record[8:]
+
+def test_suite():
+    return unittest.TestSuite((
+        doctest.DocFileSuite(
+            'queue.txt', globs={'Queue':queue.PersistentQueue}),
+        doctest.DocFileSuite(
+            'queue.txt',
+            globs={'Queue':lambda: queue.CompositePersistentQueue(2)}),
+        ))
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')



More information about the Checkins mailing list