[Checkins] SVN: zc.catalogqueue/branches/jim-saner-conflict-resolution/ New conflict resolution code that is more rational, I think. Needs tests.

Jim Fulton jim at zope.com
Wed Jul 8 15:57:11 EDT 2009


Log message for revision 101756:
  New conflict resolution code that is more rational, I think. Needs tests.

Changed:
  U   zc.catalogqueue/branches/jim-saner-conflict-resolution/buildout.cfg
  U   zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/CatalogEventQueue.py
  U   zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/tests/test_CatalogEventQueue.py

-=-
Modified: zc.catalogqueue/branches/jim-saner-conflict-resolution/buildout.cfg
===================================================================
--- zc.catalogqueue/branches/jim-saner-conflict-resolution/buildout.cfg	2009-07-08 19:55:37 UTC (rev 101755)
+++ zc.catalogqueue/branches/jim-saner-conflict-resolution/buildout.cfg	2009-07-08 19:57:10 UTC (rev 101756)
@@ -1,11 +1,7 @@
 [buildout]
 develop = .
 parts = test py
-versions = versions
 
-[versions]
-ZODB3 = 3.8.0
-
 [test]
 recipe = zc.recipe.testrunner
 eggs = zc.catalogqueue

Modified: zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/CatalogEventQueue.py
===================================================================
--- zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/CatalogEventQueue.py	2009-07-08 19:55:37 UTC (rev 101755)
+++ zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/CatalogEventQueue.py	2009-07-08 19:57:10 UTC (rev 101756)
@@ -2,14 +2,14 @@
 #
 # Copyright (c) 2002-2006 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE.
-# 
+#
 ##############################################################################
 """
 $Id$
@@ -38,7 +38,6 @@
 
 ADDED_EVENTS = (CHANGED, ADDED, CHANGED_ADDED)
 
-
 class CatalogEventQueue(Persistent):
     """Event queue for catalog events
 
@@ -59,7 +58,19 @@
 
     CHANGED_ADDED -- Add object was added and subsequently changed.
                      This event is a consequence of the queue implementation.
-                     
+
+
+        |---------------------------------------------|
+        V                                             V
+    ->[ADDED]-- change -->[CHANGED_ADDED]--remove-->[REMOVED]<-
+                             ^     |                   ^
+                             |     |                   |
+                              change                [CHANGED]<-
+                                                     ^    |
+                                                     |    ]
+                                                     change
+
+
     Note that, although we only keep track of the most recent
     event. there are rules for how the most recent event can be
     updated:
@@ -83,8 +94,8 @@
       have no effect.
 
     If we undo a transaction, we generate an anti-event. The anti
-    event of ADDED id REMOVED, of REMOVED is ADDED, and of CHANGED is
-    CHANGED. 
+    event of ADDED is REMOVED, of REMOVED is ADDED, and of CHANGED is
+    CHANGED.
 
     Note that these rules represent heuristics that attempt to provide
     efficient and sensible behavior for most cases. They are not "correct" in
@@ -111,10 +122,11 @@
       remove events.
 
     - Queue processing transactions always remove events.
-    
+
     """
 
     _conflict_policy = SAFE_POLICY
+    _generation = 0
 
     def __init__(self, conflict_policy=SAFE_POLICY):
 
@@ -127,14 +139,14 @@
 
     def __len__(self):
         return len(self._data)
-        
+
     def update(self, uid, etype):
         assert etype in EVENT_TYPES
         data = self._data
         current = data.get(uid)
+
         if current is not None:
-            delta = 0
-            generation, current = current
+            _, current = current
             if current in ADDED_EVENTS and etype is ADDED:
                 raise TypeError("Attempt to add an object that is already "
                                 "in the catalog")
@@ -145,15 +157,13 @@
             if ((current is ADDED or current is CHANGED_ADDED)
                 and etype is CHANGED):
                 etype = CHANGED_ADDED
-                
-        else:
-            delta = 1
-            generation = 0
 
-        data[uid] = generation+1, etype
+            if etype == current:
+                return 0 # no change
 
-        self._p_changed = 1
-        return delta
+        data[uid] = 0, etype
+        self._generation += 1
+        return 1
 
     def getEvent(self, uid):
         state = self._data.get(uid)
@@ -185,7 +195,8 @@
 
         # Note that in the case of undo, the olddata is the data for
         # the transaction being undone and newdata is the data for the
-        # transaction previous to the undone transaction.
+        # transaction previous to the undone transaction. In this
+        # case, the old generation will be larger than the new.
 
         # Find the conflict policy on the new state to make sure changes
         # to it will be applied
@@ -196,97 +207,85 @@
         committed_data = committed['_data']
         newstate_data  =  newstate['_data']
 
-        # Merge newstate changes into committed
-        for uid, new in newstate_data.items():
+        try:
+            newgen = newstate['_generation']
+            oldgen = oldstate['_generation']
+        except KeyError:
+            logger.error('Queue conflict %r: no generation data.', uid)
+            raise ConflictError
 
-            # Decide if this is a change
-            old = oldstate_data.get(uid)
-            current = committed_data.get(uid)
-            
-            if new != old:
-                # something changed
+        changes = {}
 
-                if old is not None:
-                    # got a repeat event
-                    if new[0] < old[0]:
-                        # This was an undo, so give the event the undo
-                        # time and convert to an anti event of the old
-                        # (undone) event. 
-                        new = (0, antiEvent(old[1]))
-                    elif new[1] is ADDED:
-                        if policy == SAFE_POLICY:
-                            logger.error('Queue conflict on %s: ADDED on existing item' % uid)
-                            raise ConflictError
-                        else:
-                            if current and current[1] == REMOVED:
-                                new = current
-                            else:
-                                new = (current[0]+1, CHANGED_ADDED)
-                            
+        if newgen < oldgen:
+            # Undo
+            #
+            # newstate is a previous revision of oldstate.  That is,
+            # oldstate is a later generation of newstate.
 
-                    # remove this event from old, so that we don't
-                    # mess with it later.
-                    del oldstate_data[uid]
 
-                # Check aqainst current value. Either we want a
-                # different event, in which case we give up, or we
-                # do nothing.
-                if current is not None:
-                    if current[1] != new[1]:
-                        if policy == SAFE_POLICY:
-                            # This is too complicated, bail
-                            logger.error('Queue conflict on %s' % uid)
-                            raise ConflictError
-                        elif REMOVED not in (new[1], current[1]):
-                            new = (current[0]+1, CHANGED_ADDED)
-                            committed_data[uid] = new
-                        elif ( current[0] < new[0] and
-                               new[1] == REMOVED ):
-                            committed_data[uid] = new
+            for uid, new in newstate_data.iteritems():
+                # If newstate has items not in old state, then that
+                # means that oldstate processed some events.  We don't
+                # care about undo support for processing of old events.
+                if uid not in oldstate:
+                    # undo of processing event, give up:
+                    logger.error('Queue conflict %r: undo processing.', uid)
 
-                        # remove this event from old, so that we don't
-                        # mess with it later.
-                        if oldstate_data.get(uid) is not None:
-                            del oldstate_data[uid]
+            for uid, old in oldstate_data.iteritems():
+                # cases:
+                # - events in oldstatate not in new state. These are
+                #   events added by old transaction. To undo them, we
+                #   we need anti events.
+                # - events that are in both old and new, but that are
+                #   different.  Apply the anti event of old
+                # - events are same in old and new, ignore
 
-                    # nothing to do
+                new = newstate_data.get(uid)
+                if new == old:
                     continue
+                changes[uid] = antiEvent[old[1]]
 
-                committed_data[uid] = new
+        else:
+            # Non undo.
+            #
+            # cases:
+            # - in old, but not in new. This means the new transaction
+            #   processed the old event.  We can ignore these cases.
+            # - in new, but not in old.  We need to include these
+            #   in the changes.
+            # - same in old and new. Not a change, ignore.
+            #   Different.  Apply the change (if we can).
+            for uid, new in newstate_data.iteritems():
+                old = oldstate_data.get(uid)
+                if old == new:
+                    continue
+                changes[uid] = new[1]
 
-        # Now handle remaining events in old that weren't in new.
-        # These *must* be undone events!
-        for uid, old in oldstate_data.items():
-            new = (0, antiEvent(old[1]))
-            
-            # See above
-            current = committed_data.get(uid)
-            if current is not None:
-                if current[1] != new[1]:
-                    # This is too complicated, bail
-                    logger.error('Queue conflict on %s processing undos' % uid)
-                    raise ConflictError
-                # nothing to do
+        # OK, now we have a set of changes. See if we can combine them with
+        # the current.
+        for uid, change in changes.iteritems():
+            committed = committed_data.get(uid)
+            if committed is None:
+                # Not in committed data.  We don't know what happened.
+                # Can't guess
+                logger.error(
+                    "Queue conflict on %s: Can't guess about new event" % uid)
+                raise ConflictError
+            if committed == change:
                 continue
+            _, committed = committed
+            _, change = change
+            if committed == REMOVED or change == REMOVED:
+                logger.error(
+                    "Queue conflict on %s: Change to removed data" % uid)
+                raise ConflictError
+            committed_data[uid] = 0, change
 
-            committed_data[uid] = new
-
         return { '_data': committed_data
                , '_conflict_policy' : policy
                }
 
-__doc__ = CatalogEventQueue.__doc__ + __doc__
+    def __repr__(self):
+        return "<%s %r>" % (self.__class__.__name__, self._data)
 
-
-
-# Old worries
-
-# We have a problem. We have to make sure that we don't lose too
-# much history to undo, but we don't want to retain the entire
-# history. We certainly don't want to execute the entire history
-# when we execute a trans.
-#
-# Baah, no worry, if we undo in a series of unprocessed events, we
-# simply restore the old event, which we have in the old state.
-
-
+__doc__ = CatalogEventQueue.__doc__ + __doc__

Modified: zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/tests/test_CatalogEventQueue.py
===================================================================
--- zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/tests/test_CatalogEventQueue.py	2009-07-08 19:55:37 UTC (rev 101755)
+++ zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/tests/test_CatalogEventQueue.py	2009-07-08 19:57:10 UTC (rev 101756)
@@ -2,14 +2,14 @@
 #
 # Copyright (c) 2002-2006 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE.
-# 
+#
 ##############################################################################
 
 import os
@@ -26,8 +26,9 @@
 from zc.catalogqueue.CatalogEventQueue import REMOVED
 from zc.catalogqueue.CatalogEventQueue import SAFE_POLICY
 from zc.catalogqueue.CatalogEventQueue import ALTERNATIVE_POLICY
-from ZODB.POSException import ConflictError 
+from ZODB.POSException import ConflictError
 
+from zope.testing import doctest, setupstack
 
 class QueueConflictTests(unittest.TestCase):
 
@@ -39,8 +40,8 @@
 
         self.assertEquals(self.queue._conflict_policy, ALTERNATIVE_POLICY)
         self.assertEquals(self.queue2._conflict_policy, ALTERNATIVE_POLICY)
- 
 
+
     def _insane_update(self, queue, uid, etype):
         # Queue update method that allows insane state changes, needed
         # to provoke pathological queue states
@@ -151,7 +152,7 @@
         # commit an ADDED event while the conflict resolution policy is
         # NOT the SAFE_POLICY, we won't get a conflict.
         self._setAlternativePolicy()
-        
+
         self.queue.update('/f0', ADDED)
         self.queue.update('/f0', CHANGED)
         self.queue._p_jar.transaction_manager.commit()
@@ -182,7 +183,7 @@
         # commit an ADDED event while the conflict resolution policy is
         # NOT the SAFE_POLICY, we won't get a conflict.
         self._setAlternativePolicy()
-        
+
         self.queue.update('/f0', ADDED)
         self.queue.update('/f0', CHANGED)
         self.queue._p_jar.transaction_manager.commit()
@@ -228,12 +229,12 @@
 
     def test_resolved_new_old_current_all_different(self):
         # If the events we get from the current, new and old states are
-        # all different and the SAFE_POLICY conflict resolution policy is 
+        # all different and the SAFE_POLICY conflict resolution policy is
         # not enforced, the conflict resolves without bloodshed.
         # This test relies on the fact that no OLD state is de-facto treated
         # as a state.
         self._setAlternativePolicy()
- 
+
         self.queue.update('/f0', ADDED)
         self.queue.update('/f0', CHANGED)
         self.queue._p_jar.transaction_manager.commit()
@@ -242,8 +243,8 @@
         self._insane_update(self.queue2, '/f0', REMOVED)
         self.queue2._p_jar.transaction_manager.commit()
 
-        # In this scenario (the incoming new state has a REMOVED event), 
-        # the new state is disregarded and the old state is used. We are 
+        # In this scenario (the incoming new state has a REMOVED event),
+        # the new state is disregarded and the old state is used. We are
         # left with a CHANGED_ADDED event. (see queue.update method; ADDED
         # plus CHANGED results in CHANGED_ADDED)
         self.queue._p_jar.sync()
@@ -280,10 +281,10 @@
 
     def test_resolved_new_old_current_all_different_2(self):
         # If the events we get from the current, new and old states are
-        # all different and the SAFE_POLICY conflict resolution policy is 
+        # all different and the SAFE_POLICY conflict resolution policy is
         # not enforced, the conflict resolves without bloodshed.
         self._setAlternativePolicy()
- 
+
         self.queue.update('/f0', ADDED)
         self.queue.update('/f0', CHANGED)
         self.queue._p_jar.transaction_manager.commit()
@@ -299,7 +300,7 @@
         self._insane_update(self.queue2, '/f0', REMOVED)
         self.queue2._p_jar.transaction_manager.commit()
 
-        # In this scenario (the incoming new state has a REMOVED event), 
+        # In this scenario (the incoming new state has a REMOVED event),
         # we will take the new state to resolve the conflict, because its
         # generation number is higher then the oldstate and current state.
         self.queue._p_jar.sync()
@@ -311,10 +312,43 @@
         self.failUnless(event1 == event2 == REMOVED)
 
 
+def conflict_with_repeated_events():
+    r"""
+    >>> import ZODB
+    >>> db = ZODB.DB('data.fs')
+    >>> import transaction
+    >>> conn1 = db.open()
+    >>> q1 = conn1.root()['q'] = CatalogEventQueue()
+    >>> q1.update('x', CHANGED)
+    1
+    >>> transaction.commit()
+    >>> q1.update('x', CHANGED)
+    1
+    >>> q1.update('y', CHANGED)
+    1
+    >>> tm2 = transaction.TransactionManager()
+    >>> conn2 = db.open(transaction_manager=tm2)
+    >>> q2 = conn2.root()['q']
+    >>> q2.update('z', CHANGED)
+    1
+    >>> tm2.commit()
+    >>> import zope.testing.loggingsupport
+    >>> handler = zope.testing.loggingsupport.InstalledHandler(
+    ...    'zc.catalogeventqueue.CatalogEventQueue')
+    >>> transaction.commit()
+    >>> q1
+    >>> print handler
+    >>> handler.uninstall()
+    >>> db.close()
+    """
+
+
 def test_suite():
     return unittest.TestSuite((
-            unittest.makeSuite(QueueConflictTests),
-                    ))
+#         doctest.DocTestSuite(
+#             setUp=setupstack.setUpDirectory, tearDown=setupstack.tearDown),
+        unittest.makeSuite(QueueConflictTests),
+        ))
 
 if __name__ == '__main__':
     unittest.main(defaultTest='test_suite')



More information about the Checkins mailing list