[Checkins] SVN: zc.queue/trunk/ - Fixed a conflict resolution bug in CompositeQueue

Gary Poster gary at zope.com
Mon Jun 25 22:55:47 EDT 2007


Log message for revision 77086:
  - Fixed a conflict resolution bug in CompositeQueue
  
  - Renamed PersistentQueue to Queue, CompositePersistentQueue to
    CompositeQueue. The old names are nominally deprecated, although no
    warnings are generated and there are no current plans to eliminate
    them.  The PersistentQueue class has more conservative conflict
    resolution than it used to.  (The Queue class has the same conflict
    resolution as the PersistentQueue used to have.)
  
  

Changed:
  U   zc.queue/trunk/CHANGES.txt
  U   zc.queue/trunk/setup.py
  U   zc.queue/trunk/src/zc/queue/__init__.py
  U   zc.queue/trunk/src/zc/queue/_queue.py
  U   zc.queue/trunk/src/zc/queue/queue.txt
  U   zc.queue/trunk/src/zc/queue/tests.py

-=-
Modified: zc.queue/trunk/CHANGES.txt
===================================================================
--- zc.queue/trunk/CHANGES.txt	2007-06-26 02:53:02 UTC (rev 77085)
+++ zc.queue/trunk/CHANGES.txt	2007-06-26 02:55:46 UTC (rev 77086)
@@ -1,8 +1,27 @@
+=======
+CHANGES
+=======
+
+1.1
+===
+
+- Fixed a conflict resolution bug in CompositeQueue
+
+- Renamed PersistentQueue to Queue, CompositePersistentQueue to
+  CompositeQueue. The old names are nominally deprecated, although no
+  warnings are generated and there are no current plans to eliminate
+  them.  The PersistentQueue class has more conservative conflict
+  resolution than it used to.  (The Queue class has the same conflict
+  resolution as the PersistentQueue used to have.)
+
 1.0.1
+=====
 
-Minor buildout changes
-Initial release to PyPI
+- Minor buildout changes
 
+- Initial release to PyPI
+
 1.0
+===
 
-Initial release to zope.org
+- Initial release to zope.org

Modified: zc.queue/trunk/setup.py
===================================================================
--- zc.queue/trunk/setup.py	2007-06-26 02:53:02 UTC (rev 77085)
+++ zc.queue/trunk/setup.py	2007-06-26 02:55:46 UTC (rev 77086)
@@ -2,7 +2,7 @@
 
 setup(
     name="zc.queue",
-    version="1.0.2-dev",
+    version="1.1",
     license="ZPL 2.1",
     author="Zope Project",
     author_email="zope3-dev at zope.org",
@@ -14,7 +14,9 @@
     install_requires=["zope.interface", "ZODB3"],
     tests_require=["zope.testing"],
     description=open('README.txt').read(),
-    long_description=open("src/zc/queue/queue.txt").read(),
+    long_description=(
+        open("CHANGES.txt").read()+
+        open("src/zc/queue/queue.txt").read()),
     keywords="zope zope3",
     zip_safe=False
     )

Modified: zc.queue/trunk/src/zc/queue/__init__.py
===================================================================
--- zc.queue/trunk/src/zc/queue/__init__.py	2007-06-26 02:53:02 UTC (rev 77085)
+++ zc.queue/trunk/src/zc/queue/__init__.py	2007-06-26 02:55:46 UTC (rev 77086)
@@ -1,2 +1,3 @@
-from zc.queue._queue import PersistentQueue, CompositePersistentQueue
+from zc.queue._queue import (
+    Queue, CompositeQueue, PersistentQueue, CompositePersistentQueue)
 

Modified: zc.queue/trunk/src/zc/queue/_queue.py
===================================================================
--- zc.queue/trunk/src/zc/queue/_queue.py	2007-06-26 02:53:02 UTC (rev 77085)
+++ zc.queue/trunk/src/zc/queue/_queue.py	2007-06-26 02:55:46 UTC (rev 77086)
@@ -5,7 +5,7 @@
 
 from zc.queue import interfaces
 
-class PersistentQueue(Persistent):
+class Queue(Persistent):
 
     interface.implements(interfaces.IQueue)
 
@@ -38,16 +38,25 @@
         return bool(self._data)
 
     def _p_resolveConflict(self, oldstate, committedstate, newstate):
-        return resolveQueueConflict(oldstate, committedstate, newstate)
+        return resolveQueueConflict(
+            oldstate, committedstate, newstate)
 
-def resolveQueueConflict(oldstate, committedstate, newstate):
+class BucketQueue(Queue):
+
+    def _p_resolveConflict(self, oldstate, committedstate, newstate):
+        return resolveQueueConflict(
+            oldstate, committedstate, newstate, True)
+
+PersistentQueue = BucketQueue # for legacy instances, be conservative
+
+def resolveQueueConflict(oldstate, committedstate, newstate, bucket=False):
     # we only know how to merge _data.  If anything else is different,
     # puke.
     if set(committedstate.keys()) != set(newstate.keys()):
-        raise ConflictError
+        raise ConflictError # can't resolve
     for key, val in newstate.items():
         if key != '_data' and val != committedstate[key]:
-            raise ConflictError
+            raise ConflictError # can't resolve
     # basically, we are ok with anything--willing to merge--
     # unless committedstate and newstate have one or more of the
     # same deletions or additions in comparison to the oldstate.
@@ -59,6 +68,15 @@
     committed_set = set(committed)
     new_set = set(new)
 
+    if bucket and bool(old_set) and (bool(committed_set) ^ bool(new_set)):
+        # This is a bucket, part of a CompositePersistentQueue.  The old set
+        # of this bucket had items, and one of the two transactions cleaned
+        # it out.  There's a reasonable chance that this bucket will be
+        # cleaned out by the parent in one of the two new transactions.
+        # We can't know for sure, so we take the conservative route of
+        # refusing to be resolvable.
+        raise ConflictError
+
     committed_added = committed_set - old_set
     committed_removed = old_set - committed_set
     new_added = new_set - old_set
@@ -66,10 +84,10 @@
 
     if new_removed & committed_removed:
         # they both removed (claimed) the same one.  Puke.
-        raise ConflictError
+        raise ConflictError # can't resolve
     elif new_added & committed_added:
         # they both added the same one.  Puke.
-        raise ConflictError
+        raise ConflictError # can't resolve
     # Now we do the merge.  We'll merge into the committed state and
     # return it.
     mod_committed = []
@@ -83,7 +101,8 @@
     committedstate['_data'] = tuple(mod_committed)
     return committedstate
 
-class CompositePersistentQueue(Persistent):
+
+class CompositeQueue(Persistent):
     """Appropriate for queues that may become large.
     
     Using this queue has one advantage and two possible disadvantages.
@@ -115,7 +134,7 @@
 
     interface.implements(interfaces.IQueue)
 
-    def __init__(self, compositeSize=15, subfactory=PersistentQueue):
+    def __init__(self, compositeSize=15, subfactory=BucketQueue):
         # the compositeSize value is a ballpark.  Because of the merging
         # policy, a composite queue might get as big as 2n under unusual
         # circumstances.  A better name for this might be "splitSize"...
@@ -198,3 +217,4 @@
     def _p_resolveConflict(self, oldstate, committedstate, newstate):
         return resolveQueueConflict(oldstate, committedstate, newstate)
 
+CompositePersistentQueue = CompositeQueue # legacy

Modified: zc.queue/trunk/src/zc/queue/queue.txt
===================================================================
--- zc.queue/trunk/src/zc/queue/queue.txt	2007-06-26 02:53:02 UTC (rev 77085)
+++ zc.queue/trunk/src/zc/queue/queue.txt	2007-06-26 02:55:46 UTC (rev 77086)
@@ -15,9 +15,9 @@
 string "hello" within the same queue at once [#why]_.
 
 The module provides two flavors: a simple persistent queue that keeps all
-contained objects in one persistent object (`PersistentQueue`), and a
+contained objects in one persistent object (`Queue`), and a
 persistent queue that divides up its contents into multiple composite
-elements (`CompositePersistentQueue`). They should be equivalent in terms of
+elements (`CompositeQueue`). They should be equivalent in terms of
 API and so are mostly examined in the abstract in this document: we'll generate
 instances with a representative `Queue` factory, that could be either class.
 They only differ in an aspect of their write conflict resolution behavior,
@@ -162,12 +162,8 @@
     >>> connection_1 = db.open(transaction_manager=transactionmanager_1)
     >>> root_1 = connection_1.root()
 
-    >>> q = Queue()
-    >>> q.__name__ = "queue"
-    >>> root_1["queue"] = q
-    >>> del q
+    >>> q_1 = root_1["queue"] = Queue()
     >>> transactionmanager_1.commit()
-    >>> q_1 = root_1['queue']
 
     >>> transactionmanager_2 = transaction.TransactionManager()
     >>> connection_2 = db.open(transaction_manager=transactionmanager_2)
@@ -201,11 +197,11 @@
 
 For commits of more than one additions per connection of two, or of more than
 two concurrent adding transactions, the behavior is the same for the
-PersistentQueue: the first commit's additions will go before the second
+Queue: the first commit's additions will go before the second
 commit's.
 
     >>> from zc import queue
-    >>> if isinstance(q_1, queue.PersistentQueue):
+    >>> if isinstance(q_1, queue.Queue):
     ...     for i in range(5):
     ...         q_1.put(i)
     ...     for i in range(1002, 1005):
@@ -220,12 +216,12 @@
 commit earlier in the queue, to result in
 [1000, 1001, 1002, 1003, 1004, 0, 1, 2, 3, 4].
 
-For the CompositePersistentQueue, the behavior is different.  The order
+For the CompositeQueue, the behavior is different.  The order
 will be maintained with a set of additions in a transaction, but the values
 may be merged between the two transactions' additions.  We will compensate
 for that here to get a reliable queue state.
 
-    >>> if isinstance(q_1, queue.CompositePersistentQueue):
+    >>> if isinstance(q_1, queue.CompositeQueue):
     ...     for i1, i2 in ((1002, 1003), (1004, 0), (1, 2), (3, 4)):
     ...         q_1.put(i1)
     ...         q_2.put(i2)
@@ -305,6 +301,11 @@
     >>> list(q_2)
     [1002, 1003, 1004, 1, 2, 3, 4, 5]
 
+There's actually a special case: the composite queue's buckets will refuse to
+merge if they started with a non-empty state, and one of the two new states
+is empty.  This is to prevent the loss of an addition to the queue.  See
+tests.py for an example.
+
 Also importantly, users can concurrently remove and add items to a queue.
 
     >>> q_1.pull()
@@ -354,24 +355,24 @@
     >>> connection_1.sync()
     >>> connection_2.sync()
 
-After these commits, if the queue is a PersistentQueue, the new values are
+After these commits, if the queue is a Queue, the new values are
 in the order of their commit.  However, as discussed above, if the queue is
-a CompositePersistentQueue the behavior is different. While the order will be
+a CompositeQueue the behavior is different. While the order will be
 maintained comparitively--so if object `A` is ahead of object `B` in the queue
 on commit then `A` will still be ahead of `B` after a merge of the conflicting
 transactions--values may be interspersed between the two transactions.
 
-For instance, if our example queue were a PersistentQueue, the values would
+For instance, if our example queue were a Queue, the values would
 be [8, 9, 10, 11, 12, 13, 14, 15].  However, if it were a
-CompositePersistentQueue, the values might be the same, or might be any
+CompositeQueue, the values might be the same, or might be any
 combination in which [8, 9, 10, 11] and [12, 13, 14, 15], from the two
 transactions, are still in order.  One ordering might be
 [8, 9, 12, 13, 10, 11, 14, 15], for instance.
 
-    >>> if isinstance(q_1, queue.PersistentQueue):
+    >>> if isinstance(q_1, queue.Queue):
     ...     res_1 = list(q_1)
     ...     res_2 = list(q_2)
-    ... elif isinstance(q_1, queue.CompositePersistentQueue):
+    ... elif isinstance(q_1, queue.CompositeQueue):
     ...     firstsrc_1 = list(q_1)
     ...     firstsrc_2 = list(q_2)
     ...     secondsrc_1 = firstsrc_1[:]

Modified: zc.queue/trunk/src/zc/queue/tests.py
===================================================================
--- zc.queue/trunk/src/zc/queue/tests.py	2007-06-26 02:53:02 UTC (rev 77085)
+++ zc.queue/trunk/src/zc/queue/tests.py	2007-06-26 02:55:46 UTC (rev 77086)
@@ -66,13 +66,82 @@
         for oid, record in self._tindex.items():
             self._old.setdefault(oid, {})[self._tid] = record[8:]
 
+def test_deleted_bucket():
+    """As described in ZODB/ConflictResolution.txt, you need to be very
+    careful of objects that are composites of other persistent objects.
+    Without careful code, the following situation can cause an item in the
+    queue to be lost.
+    
+        >>> import transaction # setup...
+        >>> from ZODB import DB
+        >>> db = DB(ConflictResolvingMappingStorage('test'))
+        >>> transactionmanager_1 = transaction.TransactionManager()
+        >>> transactionmanager_2 = transaction.TransactionManager()
+        >>> connection_1 = db.open(transaction_manager=transactionmanager_1)
+        >>> root_1 = connection_1.root()
+
+        >>> q_1 = root_1["q"] = queue.CompositeQueue()
+        >>> q_1.put(1)
+        >>> transactionmanager_1.commit()
+    
+        >>> transactionmanager_2 = transaction.TransactionManager()
+        >>> connection_2 = db.open(transaction_manager=transactionmanager_2)
+        >>> root_2 = connection_2.root()
+        >>> q_2 = root_2["q"]
+        >>> q_1.pull()
+        1
+        >>> q_2.put(2)
+        >>> transactionmanager_2.commit()
+        >>> transactionmanager_1.commit() # doctest: +ELLIPSIS
+        Traceback (most recent call last):
+        ...
+        ConflictError: ...
+
+    Without the behavior, the queue would be empty!
+
+    With a simple queue, this will merge normally.
+
+        >>> transactionmanager_1.abort()
+        >>> q_1 = root_1["q"] = queue.Queue()
+        >>> q_1.put(1)
+        >>> transactionmanager_1.commit()
+    
+        >>> transactionmanager_2 = transaction.TransactionManager()
+        >>> connection_2 = db.open(transaction_manager=transactionmanager_2)
+        >>> root_2 = connection_2.root()
+        >>> q_2 = root_2["q"]
+        >>> q_1.pull()
+        1
+        >>> q_2.put(2)
+        >>> transactionmanager_2.commit()
+        >>> transactionmanager_1.commit()
+        >>> list(q_1)
+        [2]
+
+    """
+
+def test_legacy():
+    """We used to promote the names PersistentQueue and
+    CompositePersistentQueue as the expected names for the classes in this
+    package.  They are now shortened, but the older names should stay
+    available in _queue in perpetuity.
+    
+        >>> import zc.queue._queue
+        >>> zc.queue._queue.BucketQueue is queue.PersistentQueue
+        True
+        >>> queue.CompositeQueue is queue.CompositePersistentQueue
+        True
+
+    """
+
 def test_suite():
     return unittest.TestSuite((
         doctest.DocFileSuite(
-            'queue.txt', globs={'Queue':queue.PersistentQueue}),
+            'queue.txt', globs={'Queue':queue.Queue}),
         doctest.DocFileSuite(
             'queue.txt',
-            globs={'Queue':lambda: queue.CompositePersistentQueue(2)}),
+            globs={'Queue':lambda: queue.CompositeQueue(2)}),
+        doctest.DocTestSuite()
         ))
 
 if __name__ == '__main__':



More information about the Checkins mailing list