[Checkins] SVN: zc.catalogqueue/branches/dev/ Ported Zope 2's
QueuedCatalog to Zope 3 giving it a lower-level
Jim Fulton
jim at zope.com
Wed Apr 16 18:57:31 EDT 2008
Log message for revision 85445:
Ported Zope 2's QueuedCatalog to Zope 3 giving it a lower-level
(simpler) API. The core module, CatalogEventQueue, with its tests, is
copied almost verbatum from Products.QueuedCatalog.
Changed:
U zc.catalogqueue/branches/dev/buildout.cfg
U zc.catalogqueue/branches/dev/setup.py
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/CHANGES.txt
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/CatalogEventQueue.py
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/README.txt
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/__init__.py
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/interfaces.py
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.py
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.txt
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/__init__.py
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_CatalogEventQueue.py
A zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_queue.py
-=-
Modified: zc.catalogqueue/branches/dev/buildout.cfg
===================================================================
--- zc.catalogqueue/branches/dev/buildout.cfg 2008-04-16 19:57:40 UTC (rev 85444)
+++ zc.catalogqueue/branches/dev/buildout.cfg 2008-04-16 22:57:30 UTC (rev 85445)
@@ -1,10 +1,14 @@
[buildout]
develop = .
parts = test py
+versions = versions
+[versions]
+ZODB3 = 3.8.0
+
[test]
recipe = zc.recipe.testrunner
-eggs =
+eggs = zc.catalogqueue
[py]
recipe = zc.recipe.egg
Modified: zc.catalogqueue/branches/dev/setup.py
===================================================================
--- zc.catalogqueue/branches/dev/setup.py 2008-04-16 19:57:40 UTC (rev 85444)
+++ zc.catalogqueue/branches/dev/setup.py 2008-04-16 22:57:30 UTC (rev 85445)
@@ -22,14 +22,19 @@
)).read()
long_description = (
- read('src/zc/?/README.txt')
+ read('src/zc/catalogqueue/README.txt')
+ '\n' +
+ 'Detailed Documentation\n'
+ '**********************\n'
+ + '\n' +
+ read('src/zc/catalogqueue/queue.txt')
+ + '\n' +
'Download\n'
- '--------\n'
+ '********\n'
)
setup(
- name = '',
+ name = 'zc.catalogqueue',
version = '0.1',
author = 'Jim Fulton',
author_email = 'jim at zope.com',
@@ -40,7 +45,10 @@
packages = find_packages('src'),
namespace_packages = ['zc'],
package_dir = {'': 'src'},
- install_requires = ['setuptools'],
+ install_requires = [
+ 'setuptools',
+ 'ZODB3',
+ ],
zip_safe = False,
entry_points=entry_points,
include_package_data = True,
Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/CHANGES.txt
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/CHANGES.txt (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/CHANGES.txt 2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,7 @@
+Change History
+**************
+
+0.1 (2008-04-16)
+================
+
+Initial release
Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/CHANGES.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Copied: zc.catalogqueue/branches/dev/src/zc/catalogqueue/CatalogEventQueue.py (from rev 85444, Products.QueueCatalog/trunk/CatalogEventQueue.py)
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/CatalogEventQueue.py (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/CatalogEventQueue.py 2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,289 @@
+##############################################################################
+#
+# Copyright (c) 2002-2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+$Id$
+"""
+
+import logging
+
+from persistent import Persistent
+from ZODB.POSException import ConflictError
+
+logger = logging.getLogger(__name__)
+
+SAFE_POLICY = 0
+ALTERNATIVE_POLICY = 1
+
+REMOVED = 0
+ADDED = 1
+CHANGED = 2
+CHANGED_ADDED = 3
+EVENT_TYPES = (REMOVED, CHANGED, ADDED, CHANGED_ADDED)
+antiEvent = {REMOVED: ADDED,
+ ADDED: REMOVED,
+ CHANGED: CHANGED,
+ CHANGED_ADDED: CHANGED_ADDED,
+ }.get
+
+ADDED_EVENTS = (CHANGED, ADDED, CHANGED_ADDED)
+
+
+class CatalogEventQueue(Persistent):
+ """Event queue for catalog events
+
+ This is a rather odd queue. It organizes events by object, where
+ objects are identified by uids, which happen to be string paths.
+
+ One way that this queue is extremely odd is that it really only
+ keeps track of the last event for an object. This is because we
+ really only *care* about the last event for an object.
+
+ There are three types of events:
+
+ ADDED -- An object was added to the catalog
+
+ CHANGED -- An object was changed
+
+ REMOVED -- An object was removed from the catalog
+
+ CHANGED_ADDED -- Add object was added and subsequently changed.
+ This event is a consequence of the queue implementation.
+
+ Note that, although we only keep track of the most recent
+ event. there are rules for how the most recent event can be
+ updated:
+
+ - It is illegal to update an ADDED, CHANGED, or CHANGED_ADDED
+ event with an ADDED event or
+
+ - to update a REMOVED event with a CHANGED event.
+
+ We have a problem because applications don't really indicate
+ whether they are are adding, or just updating. We deduce add
+ events by examining the catalog and event queue states.
+
+ Also note that, when events are applied to the catalog, events may
+ have no effect.
+
+ - If an object is in the catalog, ADDED events are equivalent to
+ CHANGED events.
+
+ - If an object is not in the catalog, REMOVED and CHANGED events
+ have no effect.
+
+ If we undo a transaction, we generate an anti-event. The anti
+ event of ADDED id REMOVED, of REMOVED is ADDED, and of CHANGED is
+ CHANGED.
+
+ Note that these rules represent heuristics that attempt to provide
+ efficient and sensible behavior for most cases. They are not "correct" in
+ that they handle cases that may not seem handleable. For example,
+ consider a sequence of transactions:
+
+ T1 adds an object
+ T2 removes the object
+ T3 adds the object
+ T4 processes the queue
+ T5 undoes T1
+
+ It's not clear what should be done in this case? We decide to
+ generate a remove event, even though a later transaction added the
+ object again. Is this correct? It's hard to say. The decision we
+ make is not horrible and it allows us to provide a very efficient
+ implementation. See the unit tests for other scenarios. Feel
+ free to think of cases for which our decisions are unacceptably
+ wrong and write unit tests for these cases.
+
+ There are two kinds of transactions that affect the queue:
+
+ - Application transactions always add or modify events. They never
+ remove events.
+
+ - Queue processing transactions always remove events.
+
+ """
+
+ _conflict_policy = SAFE_POLICY
+
+ def __init__(self, conflict_policy=SAFE_POLICY):
+
+ # Mapping from uid -> (generation, event type)
+ self._data = {}
+ self._conflict_policy = conflict_policy
+
+ def __nonzero__(self):
+ return not not self._data
+
+ def __len__(self):
+ return len(self._data)
+
+ def update(self, uid, etype):
+ assert etype in EVENT_TYPES
+ data = self._data
+ current = data.get(uid)
+ if current is not None:
+ generation, current = current
+ if current in ADDED_EVENTS and etype is ADDED:
+ raise TypeError("Attempt to add an object that is already "
+ "in the catalog")
+ if current is REMOVED and etype is CHANGED:
+ raise TypeError("Attempt to change an object that has "
+ "been removed")
+
+ if ((current is ADDED or current is CHANGED_ADDED)
+ and etype is CHANGED):
+ etype = CHANGED_ADDED
+
+ else:
+ generation = 0
+
+ data[uid] = generation+1, etype
+
+ self._p_changed = 1
+
+ def getEvent(self, uid):
+ state = self._data.get(uid)
+ if state is not None:
+ state = state[1]
+ return state
+
+ def process(self, limit=None):
+ """Removes and returns events from this queue.
+
+ If limit is specified, at most (limit) events are removed.
+ """
+ data = self._data
+ if not limit or len(data) <= limit:
+ self._data = {}
+ return data
+ else:
+ self._p_changed = 1
+ res = {}
+ keys = data.keys()[:limit]
+ for key in keys:
+ res[key] = data[key]
+ del data[key]
+ return res
+
+ def _p_resolveConflict(self, oldstate, committed, newstate):
+ # Apply the changes made in going from old to newstate to
+ # committed
+
+ # Note that in the case of undo, the olddata is the data for
+ # the transaction being undone and newdata is the data for the
+ # transaction previous to the undone transaction.
+
+ # Find the conflict policy on the new state to make sure changes
+ # to it will be applied
+ policy = newstate['_conflict_policy']
+
+ # Committed is always the currently committed data.
+ oldstate_data = oldstate['_data']
+ committed_data = committed['_data']
+ newstate_data = newstate['_data']
+
+ # Merge newstate changes into committed
+ for uid, new in newstate_data.items():
+
+ # Decide if this is a change
+ old = oldstate_data.get(uid)
+ current = committed_data.get(uid)
+
+ if new != old:
+ # something changed
+
+ if old is not None:
+ # got a repeat event
+ if new[0] < old[0]:
+ # This was an undo, so give the event the undo
+ # time and convert to an anti event of the old
+ # (undone) event.
+ new = (0, antiEvent(old[1]))
+ elif new[1] is ADDED:
+ if policy == SAFE_POLICY:
+ logger.error('Queue conflict on %s: ADDED on existing item' % uid)
+ raise ConflictError
+ else:
+ if current and current[1] == REMOVED:
+ new = current
+ else:
+ new = (current[0]+1, CHANGED_ADDED)
+
+
+ # remove this event from old, so that we don't
+ # mess with it later.
+ del oldstate_data[uid]
+
+ # Check aqainst current value. Either we want a
+ # different event, in which case we give up, or we
+ # do nothing.
+ if current is not None:
+ if current[1] != new[1]:
+ if policy == SAFE_POLICY:
+ # This is too complicated, bail
+ logger.error('Queue conflict on %s' % uid)
+ raise ConflictError
+ elif REMOVED not in (new[1], current[1]):
+ new = (current[0]+1, CHANGED_ADDED)
+ committed_data[uid] = new
+ elif ( current[0] < new[0] and
+ new[1] == REMOVED ):
+ committed_data[uid] = new
+
+ # remove this event from old, so that we don't
+ # mess with it later.
+ if oldstate_data.get(uid) is not None:
+ del oldstate_data[uid]
+
+ # nothing to do
+ continue
+
+ committed_data[uid] = new
+
+ # Now handle remaining events in old that weren't in new.
+ # These *must* be undone events!
+ for uid, old in oldstate_data.items():
+ new = (0, antiEvent(old[1]))
+
+ # See above
+ current = committed_data.get(uid)
+ if current is not None:
+ if current[1] != new[1]:
+ # This is too complicated, bail
+ logger.error('Queue conflict on %s processing undos' % uid)
+ raise ConflictError
+ # nothing to do
+ continue
+
+ committed_data[uid] = new
+
+ return { '_data': committed_data
+ , '_conflict_policy' : policy
+ }
+
+__doc__ = CatalogEventQueue.__doc__ + __doc__
+
+
+
+# Old worries
+
+# We have a problem. We have to make sure that we don't lose too
+# much history to undo, but we don't want to retain the entire
+# history. We certainly don't want to execute the entire history
+# when we execute a trans.
+#
+# Baah, no worry, if we undo in a series of unprocessed events, we
+# simply restore the old event, which we have in the old state.
+
+
Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/README.txt
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/README.txt (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/README.txt 2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,15 @@
+*************
+Catalog Queue
+*************
+
+A catalog queue provides a queue for catalog indexing. The basic idea
+is to queue catalog operations so:
+
+- Operations can be batched for greater efficiency
+
+- Application requests don't have to wait for indexing to be done
+
+The benefits of queueing are especially significant when text indexes
+are used.
+
+.. contents::
Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/README.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/__init__.py
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/__init__.py (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/__init__.py 2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1 @@
+#
Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/__init__.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/interfaces.py
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/interfaces.py (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/interfaces.py 2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,39 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import zope.interface
+
+class ICatalogQueue(zope.interface.Interface):
+
+ def add(id):
+ """Add the object with the given id to the catalog
+ """
+
+ def update(id):
+ """Update the object with the given id in the catalog
+ """
+
+ def remove(id):
+ """Remove the object with the given id in the catalog
+ """
+
+ def process(ids, catalogs, limit):
+ """Process up to limit objects, returning the number processed
+
+ The first argument is an object with a getObject(id) method.
+
+ Catalogs is a multi-iterable collection of
+ zope.index.interfaces.IInjection objects to be updated.
+ """
+
Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/interfaces.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.py
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.py (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.py 2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,68 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from zc.catalogqueue.CatalogEventQueue import REMOVED, CHANGED, ADDED
+
+import logging
+import persistent
+import zc.catalogqueue.CatalogEventQueue
+import zc.catalogqueue.interfaces
+import zope.interface
+
+logger = logging.getLogger(__name__)
+
+class CatalogQueue(persistent.Persistent):
+
+ zope.interface.implements(zc.catalogqueue.interfaces.ICatalogQueue)
+
+ _buckets = 1009 # Maybe configurable someday
+
+ def __init__(self):
+ self._queues = [
+ zc.catalogqueue.CatalogEventQueue.CatalogEventQueue()
+ for i in range(self._buckets)
+ ]
+
+ def _notify(self, id, event):
+ self._queues[hash(id) % self._buckets].update(id, event)
+
+ def add(self, id):
+ self._notify(id, ADDED)
+
+ def update(self, id):
+ self._notify(id, CHANGED)
+
+ def remove(self, id):
+ self._notify(id, REMOVED)
+
+ def process(self, ids, catalogs, limit):
+ done = 0
+ for queue in self._queues:
+ for id, (_, event) in queue.process(limit-done).iteritems():
+ if event is REMOVED:
+ for catalog in catalogs:
+ catalog.unindex_doc(id)
+ else:
+ ob = ids.queryObject(id)
+ if ob is None:
+ logger.warn("Couldn't find object for %s", id)
+ else:
+ for catalog in catalogs:
+ catalog.index_doc(id, ob)
+ done += 1
+
+ if done >= limit:
+ break
+
+ return done
Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.txt
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.txt (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.txt 2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,129 @@
+Using Queues
+============
+
+A queue is created by instantiating a
+zc.catalogqueue.queue.CatalogQueue object:
+
+ >>> import zc.catalogqueue.queue
+ >>> queue = zc.catalogqueue.queue.CatalogQueue()
+
+Typically, queues are registered as
+zc.catalogqueue.interfaces.ICatalogQueue utilities.
+
+ >>> import zope.interface, pprint
+ >>> pprint.pprint(sorted(zope.interface.providedBy(queue)), width=1)
+ [<InterfaceClass zc.catalogqueue.interfaces.ICatalogQueue>,
+ <InterfaceClass persistent.interfaces.IPersistent>]
+
+Queues are used in 2 ways. As content are modified, we call add,
+update, and remove methods on the queue:
+
+ >>> queue.add(1)
+ >>> queue.update(1)
+ >>> queue.remove(1)
+
+ >>> queue.update(2)
+ >>> queue.update(2)
+
+ >>> queue.add(3)
+ >>> queue.update(3)
+ >>> queue.add(3)
+ Traceback (most recent call last):
+ ...
+ TypeError: Attempt to add an object that is already in the catalog
+
+ >>> queue.update(4)
+ >>> queue.update(4)
+ >>> queue.update(4)
+
+ >>> queue.remove(5)
+ >>> queue.update(5)
+ Traceback (most recent call last):
+ ...
+ TypeError: Attempt to change an object that has been removed
+
+ >>> queue.update(0)
+ >>> queue.update(0)
+
+Periodically, we call process on the queue. We need to pass an ids
+object and a collection of injection (catalog) objects:
+
+ >>> class Ids:
+ ... def queryObject(self, id, default=None):
+ ... if not id:
+ ... return default
+ ... return "object %s" % id
+
+ >>> class Injection:
+ ... def __init__(self, name):
+ ... self.name = name
+ ... def index_doc(self, docid, value):
+ ... print self.name, 'indexing', docid, value
+ ... def unindex_doc(self, docid):
+ ... print self.name, 'unindexing', docid
+
+ >>> queue.process(Ids(), [Injection('cat1'), Injection('cat2')], 10)
+ cat1 unindexing 1
+ cat2 unindexing 1
+ cat1 indexing 2 object 2
+ cat2 indexing 2 object 2
+ cat1 indexing 3 object 3
+ cat2 indexing 3 object 3
+ cat1 indexing 4 object 4
+ cat2 indexing 4 object 4
+ cat1 unindexing 5
+ cat2 unindexing 5
+ 6
+
+There are a number of things to note about this example:
+
+- Each object is processed only once.
+
+- What happens depends on th elast event.
+
+- Object 0 wasn't indexed because queryObject returned None. We
+ ignore events for objects that have been removed from the intid
+ utility.
+
+- The number of objects processed is returned.
+
+If we process the queue without additional events, we'll just get 0
+back:
+
+ >>> queue.process(Ids(), [Injection('cat1'), Injection('cat2')], 10)
+ 0
+
+Of course, the limit argument limites how many events we process:
+
+ >>> for i in range(10):
+ ... queue.update(i)
+ >>> queue.process(Ids(), [Injection('cat1')], 5)
+ cat1 indexing 1 object 1
+ cat1 indexing 2 object 2
+ cat1 indexing 3 object 3
+ cat1 indexing 4 object 4
+ 5
+
+ >>> queue.process(Ids(), [Injection('cat1')], 5)
+ cat1 indexing 5 object 5
+ cat1 indexing 6 object 6
+ cat1 indexing 7 object 7
+ cat1 indexing 8 object 8
+ cat1 indexing 9 object 9
+ 5
+
+(Remember that 0 isn't processed because it can't be found.)
+
+When an object can't be found, a warning is logged:
+
+ >>> import zope.testing.loggingsupport
+ >>> handler = zope.testing.loggingsupport.InstalledHandler('zc')
+ >>> queue.update(0)
+ >>> queue.process(Ids(), [Injection('cat1')], 5)
+ 1
+
+ >>> print handler
+ zc.catalogqueue.queue WARNING
+ Couldn't find object for 0
+
+ >>> handler.uninstall()
Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/__init__.py
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/__init__.py (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/__init__.py 2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1 @@
+#
Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/__init__.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Copied: zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_CatalogEventQueue.py (from rev 85444, Products.QueueCatalog/trunk/tests/test_CatalogEventQueue.py)
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_CatalogEventQueue.py (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_CatalogEventQueue.py 2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,321 @@
+##############################################################################
+#
+# Copyright (c) 2002-2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import transaction
+
+from zc.catalogqueue.CatalogEventQueue import CatalogEventQueue
+from zc.catalogqueue.CatalogEventQueue import ADDED
+from zc.catalogqueue.CatalogEventQueue import CHANGED
+from zc.catalogqueue.CatalogEventQueue import CHANGED_ADDED
+from zc.catalogqueue.CatalogEventQueue import REMOVED
+from zc.catalogqueue.CatalogEventQueue import SAFE_POLICY
+from zc.catalogqueue.CatalogEventQueue import ALTERNATIVE_POLICY
+from ZODB.POSException import ConflictError
+
+
+class QueueConflictTests(unittest.TestCase):
+
+ def _setAlternativePolicy(self):
+ # Apply the alternative conflict resolution policy
+ self.queue._conflict_policy = ALTERNATIVE_POLICY
+ self.queue._p_jar.transaction_manager.commit()
+ self.queue2._p_jar.sync()
+
+ self.assertEquals(self.queue._conflict_policy, ALTERNATIVE_POLICY)
+ self.assertEquals(self.queue2._conflict_policy, ALTERNATIVE_POLICY)
+
+
+ def _insane_update(self, queue, uid, etype):
+ # Queue update method that allows insane state changes, needed
+ # to provoke pathological queue states
+ data = queue._data
+ current = data.get(uid)
+ if current is not None:
+ generation, current = current
+
+ if ((current is ADDED or current is CHANGED_ADDED)
+ and etype is CHANGED):
+ etype = CHANGED_ADDED
+ else:
+ generation = 0
+
+ data[uid] = generation+1, etype
+
+ queue._p_changed = 1
+
+ def openDB(self):
+ from ZODB.FileStorage import FileStorage
+ from ZODB.DB import DB
+ self.dir = tempfile.mkdtemp()
+ self.storage = FileStorage(os.path.join(self.dir, 'testQCConflicts.fs'))
+ self.db = DB(self.storage)
+
+ def setUp(self):
+ self.openDB()
+ queue = CatalogEventQueue()
+
+ tm1 = transaction.TransactionManager()
+ self.conn1 = self.db.open(transaction_manager=tm1)
+ r1 = self.conn1.root()
+ r1["queue"] = queue
+ del queue
+ self.queue = r1["queue"]
+ tm1.commit()
+
+ tm2 = transaction.TransactionManager()
+ self.conn2 = self.db.open(transaction_manager=tm2)
+ r2 = self.conn2.root()
+ self.queue2 = r2["queue"]
+ ignored = dir(self.queue2) # unghostify
+
+ def tearDown(self):
+ transaction.abort()
+ del self.queue
+ del self.queue2
+ if self.storage is not None:
+ self.storage.close()
+ self.storage.cleanup()
+ shutil.rmtree(self.dir)
+
+ def test_rig(self):
+ # Test the test rig
+ self.assertEqual(self.queue._p_serial, self.queue2._p_serial)
+
+ def test_simpleConflict(self):
+ # Using the first connection, index 10 paths
+ for n in range(10):
+ self.queue.update('/f%i' % n, ADDED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ # After this run, the first connection's queuecatalog has 10
+ # entries, the second has none.
+ self.assertEqual(len(self.queue), 10)
+ self.assertEqual(len(self.queue2), 0)
+
+ # Using the second connection, index the other 10 folders
+ for n in range(10):
+ self.queue2.update('/g%i' % n, ADDED)
+
+ # Now both connections' queuecatalogs have 10 entries each, but
+ # for differrent objects
+ self.assertEqual(len(self.queue), 10)
+ self.assertEqual(len(self.queue2), 10)
+
+ # Now we commit. Conflict resolution on the catalog queue should
+ # kick in because both connections have changes. Since none of the
+ # events collide, we should end up with 20 entries in our catalogs.
+ self.queue2._p_jar.transaction_manager.commit()
+ self.queue._p_jar.sync()
+ self.queue2._p_jar.sync()
+ self.assertEqual(len(self.queue), 20)
+ self.assertEqual(len(self.queue2), 20)
+
+ def test_unresolved_add_after_something(self):
+ # If an event is encountered for an object and we are trying to
+ # commit an ADDED event, a conflict is encountered
+
+ self.queue.update('/f0', ADDED)
+ self.queue.update('/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ self.queue2.update('/f0', ADDED)
+ self.queue2.update('/f0', CHANGED)
+ self.queue2._p_jar.transaction_manager.commit()
+
+ self._insane_update(self.queue, '/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ self._insane_update(self.queue2, '/f0', ADDED)
+ self.assertRaises( ConflictError
+ , self.queue2._p_jar.transaction_manager.commit
+ )
+
+ def test_resolved_add_after_nonremoval(self):
+ # If an event is encountered for an object and we are trying to
+ # commit an ADDED event while the conflict resolution policy is
+ # NOT the SAFE_POLICY, we won't get a conflict.
+ self._setAlternativePolicy()
+
+ self.queue.update('/f0', ADDED)
+ self.queue.update('/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ self.queue2.update('/f0', ADDED)
+ self.queue2.update('/f0', CHANGED)
+ self.queue2._p_jar.transaction_manager.commit()
+
+ self._insane_update(self.queue, '/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ # If we had a conflict, this would blow up
+ self._insane_update(self.queue2, '/f0', ADDED)
+ self.queue2._p_jar.transaction_manager.commit()
+
+ # After the conflict has been resolved, we expect the queues to
+ # containa a CHANGED_ADDED event.
+ self.queue._p_jar.sync()
+ self.queue2._p_jar.sync()
+ self.assertEquals(len(self.queue), 1)
+ self.assertEquals(len(self.queue2), 1)
+ event1 = self.queue.getEvent('/f0')
+ event2 = self.queue2.getEvent('/f0')
+ self.failUnless(event1 == event2 == CHANGED_ADDED)
+
+ def test_resolved_add_after_removal(self):
+ # If a REMOVED event is encountered for an object and we are trying to
+ # commit an ADDED event while the conflict resolution policy is
+ # NOT the SAFE_POLICY, we won't get a conflict.
+ self._setAlternativePolicy()
+
+ self.queue.update('/f0', ADDED)
+ self.queue.update('/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ self.queue2.update('/f0', ADDED)
+ self.queue2.update('/f0', CHANGED)
+ self.queue2._p_jar.transaction_manager.commit()
+
+ self.queue.update('/f0', REMOVED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ # If we had a conflict, this would blow up
+ self._insane_update(self.queue2, '/f0', ADDED)
+ self.queue2._p_jar.transaction_manager.commit()
+
+ # After the conflict has been resolved, we expect the queue to
+ # contain a REMOVED event.
+ self.queue._p_jar.sync()
+ self.queue2._p_jar.sync()
+ self.assertEquals(len(self.queue), 1)
+ self.assertEquals(len(self.queue2), 1)
+ event1 = self.queue.getEvent('/f0')
+ event2 = self.queue2.getEvent('/f0')
+ self.failUnless(event1 == event2 == REMOVED)
+
+ def test_unresolved_new_old_current_all_different(self):
+ # If the events we get from the current, new and old states are
+ # all different, we throw in the towel in the form of a conflict.
+ # This test relies on the fact that no OLD state is de-facto treated
+ # as a state.
+
+
+ self.queue.update('/f0', ADDED)
+ self.queue.update('/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ # This commit should now raise a conflict
+ self._insane_update(self.queue2, '/f0', REMOVED)
+ self.assertRaises( ConflictError
+ , self.queue2._p_jar.transaction_manager.commit
+ )
+
+
+ def test_resolved_new_old_current_all_different(self):
+ # If the events we get from the current, new and old states are
+ # all different and the SAFE_POLICY conflict resolution policy is
+ # not enforced, the conflict resolves without bloodshed.
+ # This test relies on the fact that no OLD state is de-facto treated
+ # as a state.
+ self._setAlternativePolicy()
+
+ self.queue.update('/f0', ADDED)
+ self.queue.update('/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ # This commit should not raise a conflict
+ self._insane_update(self.queue2, '/f0', REMOVED)
+ self.queue2._p_jar.transaction_manager.commit()
+
+ # In this scenario (the incoming new state has a REMOVED event),
+ # the new state is disregarded and the old state is used. We are
+ # left with a CHANGED_ADDED event. (see queue.update method; ADDED
+ # plus CHANGED results in CHANGED_ADDED)
+ self.queue._p_jar.sync()
+ self.queue2._p_jar.sync()
+ self.assertEquals(len(self.queue), 1)
+ self.assertEquals(len(self.queue2), 1)
+ event1 = self.queue.getEvent('/f0')
+ event2 = self.queue2.getEvent('/f0')
+ self.failUnless(event1 == event2 == CHANGED_ADDED)
+
+ def test_unresolved_new_old_current_all_different_2(self):
+ # If the events we get from the current, new and old states are
+ # all different, we throw in the towel in the form of a conflict.
+ # This test relies on the fact that no OLD state is de-facto treated
+ # as a state.
+
+
+ self.queue.update('/f0', ADDED)
+ self.queue.update('/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ self.queue2.update('/f0', ADDED)
+ self.queue2.update('/f0', CHANGED)
+ self.queue2._p_jar.transaction_manager.commit()
+
+ self.queue.update('/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ # This commit should now raise a conflict
+ self._insane_update(self.queue2, '/f0', REMOVED)
+ self.assertRaises( ConflictError
+ , self.queue2._p_jar.transaction_manager.commit
+ )
+
+ def test_resolved_new_old_current_all_different_2(self):
+ # If the events we get from the current, new and old states are
+ # all different and the SAFE_POLICY conflict resolution policy is
+ # not enforced, the conflict resolves without bloodshed.
+ self._setAlternativePolicy()
+
+ self.queue.update('/f0', ADDED)
+ self.queue.update('/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ self.queue2.update('/f0', ADDED)
+ self.queue2.update('/f0', CHANGED)
+ self.queue2._p_jar.transaction_manager.commit()
+
+ self.queue.update('/f0', CHANGED)
+ self.queue._p_jar.transaction_manager.commit()
+
+ # This commit should not raise a conflict
+ self._insane_update(self.queue2, '/f0', REMOVED)
+ self.queue2._p_jar.transaction_manager.commit()
+
+ # In this scenario (the incoming new state has a REMOVED event),
+ # we will take the new state to resolve the conflict, because its
+ # generation number is higher then the oldstate and current state.
+ self.queue._p_jar.sync()
+ self.queue2._p_jar.sync()
+ self.assertEquals(len(self.queue), 1)
+ self.assertEquals(len(self.queue2), 1)
+ event1 = self.queue.getEvent('/f0')
+ event2 = self.queue2.getEvent('/f0')
+ self.failUnless(event1 == event2 == REMOVED)
+
+
+def test_suite():
+ return unittest.TestSuite((
+ unittest.makeSuite(QueueConflictTests),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
+
Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_queue.py
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_queue.py (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_queue.py 2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,25 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import unittest
+from zope.testing import doctest
+
+
+def test_suite():
+ return unittest.TestSuite((
+ doctest.DocFileSuite('../queue.txt'),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
+
Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_queue.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
More information about the Checkins
mailing list