[Checkins] SVN: zc.catalogqueue/branches/jim-saner-conflict-resolution/ New conflict resolution code that is more rational, I think. Needs tests.
Jim Fulton
jim at zope.com
Wed Jul 8 15:57:11 EDT 2009
Log message for revision 101756:
New conflict resolution code that is more rational, I think. Needs tests.
Changed:
U zc.catalogqueue/branches/jim-saner-conflict-resolution/buildout.cfg
U zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/CatalogEventQueue.py
U zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/tests/test_CatalogEventQueue.py
-=-
Modified: zc.catalogqueue/branches/jim-saner-conflict-resolution/buildout.cfg
===================================================================
--- zc.catalogqueue/branches/jim-saner-conflict-resolution/buildout.cfg 2009-07-08 19:55:37 UTC (rev 101755)
+++ zc.catalogqueue/branches/jim-saner-conflict-resolution/buildout.cfg 2009-07-08 19:57:10 UTC (rev 101756)
@@ -1,11 +1,7 @@
[buildout]
develop = .
parts = test py
-versions = versions
-[versions]
-ZODB3 = 3.8.0
-
[test]
recipe = zc.recipe.testrunner
eggs = zc.catalogqueue
Modified: zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/CatalogEventQueue.py
===================================================================
--- zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/CatalogEventQueue.py 2009-07-08 19:55:37 UTC (rev 101755)
+++ zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/CatalogEventQueue.py 2009-07-08 19:57:10 UTC (rev 101756)
@@ -2,14 +2,14 @@
#
# Copyright (c) 2002-2006 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
-#
+#
##############################################################################
"""
$Id$
@@ -38,7 +38,6 @@
ADDED_EVENTS = (CHANGED, ADDED, CHANGED_ADDED)
-
class CatalogEventQueue(Persistent):
"""Event queue for catalog events
@@ -59,7 +58,19 @@
CHANGED_ADDED -- Add object was added and subsequently changed.
This event is a consequence of the queue implementation.
-
+
+
+ |---------------------------------------------|
+ V V
+ ->[ADDED]-- change -->[CHANGED_ADDED]--remove-->[REMOVED]<-
+ ^ | ^
+ | | |
+ change [CHANGED]<-
+ ^ |
+ | ]
+ change
+
+
Note that, although we only keep track of the most recent
event. there are rules for how the most recent event can be
updated:
@@ -83,8 +94,8 @@
have no effect.
If we undo a transaction, we generate an anti-event. The anti
- event of ADDED id REMOVED, of REMOVED is ADDED, and of CHANGED is
- CHANGED.
+ event of ADDED is REMOVED, of REMOVED is ADDED, and of CHANGED is
+ CHANGED.
Note that these rules represent heuristics that attempt to provide
efficient and sensible behavior for most cases. They are not "correct" in
@@ -111,10 +122,11 @@
remove events.
- Queue processing transactions always remove events.
-
+
"""
_conflict_policy = SAFE_POLICY
+ _generation = 0
def __init__(self, conflict_policy=SAFE_POLICY):
@@ -127,14 +139,14 @@
def __len__(self):
return len(self._data)
-
+
def update(self, uid, etype):
assert etype in EVENT_TYPES
data = self._data
current = data.get(uid)
+
if current is not None:
- delta = 0
- generation, current = current
+ _, current = current
if current in ADDED_EVENTS and etype is ADDED:
raise TypeError("Attempt to add an object that is already "
"in the catalog")
@@ -145,15 +157,13 @@
if ((current is ADDED or current is CHANGED_ADDED)
and etype is CHANGED):
etype = CHANGED_ADDED
-
- else:
- delta = 1
- generation = 0
- data[uid] = generation+1, etype
+ if etype == current:
+ return 0 # no change
- self._p_changed = 1
- return delta
+ data[uid] = 0, etype
+ self._generation += 1
+ return 1
def getEvent(self, uid):
state = self._data.get(uid)
@@ -185,7 +195,8 @@
# Note that in the case of undo, the olddata is the data for
# the transaction being undone and newdata is the data for the
- # transaction previous to the undone transaction.
+ # transaction previous to the undone transaction. In this
+ # case, the old generation will be larger than the new.
# Find the conflict policy on the new state to make sure changes
# to it will be applied
@@ -196,97 +207,85 @@
committed_data = committed['_data']
newstate_data = newstate['_data']
- # Merge newstate changes into committed
- for uid, new in newstate_data.items():
+ try:
+ newgen = newstate['_generation']
+ oldgen = oldstate['_generation']
+ except KeyError:
+ logger.error('Queue conflict %r: no generation data.', uid)
+ raise ConflictError
- # Decide if this is a change
- old = oldstate_data.get(uid)
- current = committed_data.get(uid)
-
- if new != old:
- # something changed
+ changes = {}
- if old is not None:
- # got a repeat event
- if new[0] < old[0]:
- # This was an undo, so give the event the undo
- # time and convert to an anti event of the old
- # (undone) event.
- new = (0, antiEvent(old[1]))
- elif new[1] is ADDED:
- if policy == SAFE_POLICY:
- logger.error('Queue conflict on %s: ADDED on existing item' % uid)
- raise ConflictError
- else:
- if current and current[1] == REMOVED:
- new = current
- else:
- new = (current[0]+1, CHANGED_ADDED)
-
+ if newgen < oldgen:
+ # Undo
+ #
+ # newstate is a previous revision of oldstate. That is,
+ # oldstate is a later generation of newstate.
- # remove this event from old, so that we don't
- # mess with it later.
- del oldstate_data[uid]
- # Check aqainst current value. Either we want a
- # different event, in which case we give up, or we
- # do nothing.
- if current is not None:
- if current[1] != new[1]:
- if policy == SAFE_POLICY:
- # This is too complicated, bail
- logger.error('Queue conflict on %s' % uid)
- raise ConflictError
- elif REMOVED not in (new[1], current[1]):
- new = (current[0]+1, CHANGED_ADDED)
- committed_data[uid] = new
- elif ( current[0] < new[0] and
- new[1] == REMOVED ):
- committed_data[uid] = new
+ for uid, new in newstate_data.iteritems():
+ # If newstate has items not in old state, then that
+ # means that oldstate processed some events. We don't
+ # care about undo support for processing of old events.
+ if uid not in oldstate:
+ # undo of processing event, give up:
+ logger.error('Queue conflict %r: undo processing.', uid)
- # remove this event from old, so that we don't
- # mess with it later.
- if oldstate_data.get(uid) is not None:
- del oldstate_data[uid]
+ for uid, old in oldstate_data.iteritems():
+ # cases:
+ # - events in oldstatate not in new state. These are
+ # events added by old transaction. To undo them, we
+ # we need anti events.
+ # - events that are in both old and new, but that are
+ # different. Apply the anti event of old
+ # - events are same in old and new, ignore
- # nothing to do
+ new = newstate_data.get(uid)
+ if new == old:
continue
+ changes[uid] = antiEvent[old[1]]
- committed_data[uid] = new
+ else:
+ # Non undo.
+ #
+ # cases:
+ # - in old, but not in new. This means the new transaction
+ # processed the old event. We can ignore these cases.
+ # - in new, but not in old. We need to include these
+ # in the changes.
+ # - same in old and new. Not a change, ignore.
+ # Different. Apply the change (if we can).
+ for uid, new in newstate_data.iteritems():
+ old = oldstate_data.get(uid)
+ if old == new:
+ continue
+ changes[uid] = new[1]
- # Now handle remaining events in old that weren't in new.
- # These *must* be undone events!
- for uid, old in oldstate_data.items():
- new = (0, antiEvent(old[1]))
-
- # See above
- current = committed_data.get(uid)
- if current is not None:
- if current[1] != new[1]:
- # This is too complicated, bail
- logger.error('Queue conflict on %s processing undos' % uid)
- raise ConflictError
- # nothing to do
+ # OK, now we have a set of changes. See if we can combine them with
+ # the current.
+ for uid, change in changes.iteritems():
+ committed = committed_data.get(uid)
+ if committed is None:
+ # Not in committed data. We don't know what happened.
+ # Can't guess
+ logger.error(
+ "Queue conflict on %s: Can't guess about new event" % uid)
+ raise ConflictError
+ if committed == change:
continue
+ _, committed = committed
+ _, change = change
+ if committed == REMOVED or change == REMOVED:
+ logger.error(
+ "Queue conflict on %s: Change to removed data" % uid)
+ raise ConflictError
+ committed_data[uid] = 0, change
- committed_data[uid] = new
-
return { '_data': committed_data
, '_conflict_policy' : policy
}
-__doc__ = CatalogEventQueue.__doc__ + __doc__
+ def __repr__(self):
+ return "<%s %r>" % (self.__class__.__name__, self._data)
-
-
-# Old worries
-
-# We have a problem. We have to make sure that we don't lose too
-# much history to undo, but we don't want to retain the entire
-# history. We certainly don't want to execute the entire history
-# when we execute a trans.
-#
-# Baah, no worry, if we undo in a series of unprocessed events, we
-# simply restore the old event, which we have in the old state.
-
-
+__doc__ = CatalogEventQueue.__doc__ + __doc__
Modified: zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/tests/test_CatalogEventQueue.py
===================================================================
--- zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/tests/test_CatalogEventQueue.py 2009-07-08 19:55:37 UTC (rev 101755)
+++ zc.catalogqueue/branches/jim-saner-conflict-resolution/src/zc/catalogqueue/tests/test_CatalogEventQueue.py 2009-07-08 19:57:10 UTC (rev 101756)
@@ -2,14 +2,14 @@
#
# Copyright (c) 2002-2006 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
-#
+#
##############################################################################
import os
@@ -26,8 +26,9 @@
from zc.catalogqueue.CatalogEventQueue import REMOVED
from zc.catalogqueue.CatalogEventQueue import SAFE_POLICY
from zc.catalogqueue.CatalogEventQueue import ALTERNATIVE_POLICY
-from ZODB.POSException import ConflictError
+from ZODB.POSException import ConflictError
+from zope.testing import doctest, setupstack
class QueueConflictTests(unittest.TestCase):
@@ -39,8 +40,8 @@
self.assertEquals(self.queue._conflict_policy, ALTERNATIVE_POLICY)
self.assertEquals(self.queue2._conflict_policy, ALTERNATIVE_POLICY)
-
+
def _insane_update(self, queue, uid, etype):
# Queue update method that allows insane state changes, needed
# to provoke pathological queue states
@@ -151,7 +152,7 @@
# commit an ADDED event while the conflict resolution policy is
# NOT the SAFE_POLICY, we won't get a conflict.
self._setAlternativePolicy()
-
+
self.queue.update('/f0', ADDED)
self.queue.update('/f0', CHANGED)
self.queue._p_jar.transaction_manager.commit()
@@ -182,7 +183,7 @@
# commit an ADDED event while the conflict resolution policy is
# NOT the SAFE_POLICY, we won't get a conflict.
self._setAlternativePolicy()
-
+
self.queue.update('/f0', ADDED)
self.queue.update('/f0', CHANGED)
self.queue._p_jar.transaction_manager.commit()
@@ -228,12 +229,12 @@
def test_resolved_new_old_current_all_different(self):
# If the events we get from the current, new and old states are
- # all different and the SAFE_POLICY conflict resolution policy is
+ # all different and the SAFE_POLICY conflict resolution policy is
# not enforced, the conflict resolves without bloodshed.
# This test relies on the fact that no OLD state is de-facto treated
# as a state.
self._setAlternativePolicy()
-
+
self.queue.update('/f0', ADDED)
self.queue.update('/f0', CHANGED)
self.queue._p_jar.transaction_manager.commit()
@@ -242,8 +243,8 @@
self._insane_update(self.queue2, '/f0', REMOVED)
self.queue2._p_jar.transaction_manager.commit()
- # In this scenario (the incoming new state has a REMOVED event),
- # the new state is disregarded and the old state is used. We are
+ # In this scenario (the incoming new state has a REMOVED event),
+ # the new state is disregarded and the old state is used. We are
# left with a CHANGED_ADDED event. (see queue.update method; ADDED
# plus CHANGED results in CHANGED_ADDED)
self.queue._p_jar.sync()
@@ -280,10 +281,10 @@
def test_resolved_new_old_current_all_different_2(self):
# If the events we get from the current, new and old states are
- # all different and the SAFE_POLICY conflict resolution policy is
+ # all different and the SAFE_POLICY conflict resolution policy is
# not enforced, the conflict resolves without bloodshed.
self._setAlternativePolicy()
-
+
self.queue.update('/f0', ADDED)
self.queue.update('/f0', CHANGED)
self.queue._p_jar.transaction_manager.commit()
@@ -299,7 +300,7 @@
self._insane_update(self.queue2, '/f0', REMOVED)
self.queue2._p_jar.transaction_manager.commit()
- # In this scenario (the incoming new state has a REMOVED event),
+ # In this scenario (the incoming new state has a REMOVED event),
# we will take the new state to resolve the conflict, because its
# generation number is higher then the oldstate and current state.
self.queue._p_jar.sync()
@@ -311,10 +312,43 @@
self.failUnless(event1 == event2 == REMOVED)
+def conflict_with_repeated_events():
+ r"""
+ >>> import ZODB
+ >>> db = ZODB.DB('data.fs')
+ >>> import transaction
+ >>> conn1 = db.open()
+ >>> q1 = conn1.root()['q'] = CatalogEventQueue()
+ >>> q1.update('x', CHANGED)
+ 1
+ >>> transaction.commit()
+ >>> q1.update('x', CHANGED)
+ 1
+ >>> q1.update('y', CHANGED)
+ 1
+ >>> tm2 = transaction.TransactionManager()
+ >>> conn2 = db.open(transaction_manager=tm2)
+ >>> q2 = conn2.root()['q']
+ >>> q2.update('z', CHANGED)
+ 1
+ >>> tm2.commit()
+ >>> import zope.testing.loggingsupport
+ >>> handler = zope.testing.loggingsupport.InstalledHandler(
+ ... 'zc.catalogeventqueue.CatalogEventQueue')
+ >>> transaction.commit()
+ >>> q1
+ >>> print handler
+ >>> handler.uninstall()
+ >>> db.close()
+ """
+
+
def test_suite():
return unittest.TestSuite((
- unittest.makeSuite(QueueConflictTests),
- ))
+# doctest.DocTestSuite(
+# setUp=setupstack.setUpDirectory, tearDown=setupstack.tearDown),
+ unittest.makeSuite(QueueConflictTests),
+ ))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
More information about the Checkins
mailing list