[Checkins] SVN: zc.catalogqueue/branches/dev/ Ported Zope 2's QueuedCatalog to Zope 3 giving it a lower-level

Jim Fulton jim at zope.com
Wed Apr 16 18:57:31 EDT 2008


Log message for revision 85445:
  Ported Zope 2's QueuedCatalog to Zope 3 giving it a lower-level
  (simpler) API.  The core module, CatalogEventQueue, with its tests, is
  copied almost verbatum from Products.QueuedCatalog.
  

Changed:
  U   zc.catalogqueue/branches/dev/buildout.cfg
  U   zc.catalogqueue/branches/dev/setup.py
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/CHANGES.txt
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/CatalogEventQueue.py
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/README.txt
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/__init__.py
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/interfaces.py
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.py
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.txt
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/__init__.py
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_CatalogEventQueue.py
  A   zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_queue.py

-=-
Modified: zc.catalogqueue/branches/dev/buildout.cfg
===================================================================
--- zc.catalogqueue/branches/dev/buildout.cfg	2008-04-16 19:57:40 UTC (rev 85444)
+++ zc.catalogqueue/branches/dev/buildout.cfg	2008-04-16 22:57:30 UTC (rev 85445)
@@ -1,10 +1,14 @@
 [buildout]
 develop = .
 parts = test py
+versions = versions
 
+[versions]
+ZODB3 = 3.8.0
+
 [test]
 recipe = zc.recipe.testrunner
-eggs = 
+eggs = zc.catalogqueue
 
 [py]
 recipe = zc.recipe.egg

Modified: zc.catalogqueue/branches/dev/setup.py
===================================================================
--- zc.catalogqueue/branches/dev/setup.py	2008-04-16 19:57:40 UTC (rev 85444)
+++ zc.catalogqueue/branches/dev/setup.py	2008-04-16 22:57:30 UTC (rev 85445)
@@ -22,14 +22,19 @@
                              )).read()
 
 long_description = (
-        read('src/zc/?/README.txt')
+        read('src/zc/catalogqueue/README.txt')
         + '\n' +
+        'Detailed Documentation\n'
+        '**********************\n'
+        + '\n' +
+        read('src/zc/catalogqueue/queue.txt')
+        + '\n' +
         'Download\n'
-        '--------\n'
+        '********\n'
         )
 
 setup(
-    name = '',
+    name = 'zc.catalogqueue',
     version = '0.1',
     author = 'Jim Fulton',
     author_email = 'jim at zope.com',
@@ -40,7 +45,10 @@
     packages = find_packages('src'),
     namespace_packages = ['zc'],
     package_dir = {'': 'src'},
-    install_requires = ['setuptools'],
+    install_requires = [
+        'setuptools',
+        'ZODB3',
+        ],
     zip_safe = False,
     entry_points=entry_points,
     include_package_data = True,

Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/CHANGES.txt
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/CHANGES.txt	                        (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/CHANGES.txt	2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,7 @@
+Change History
+**************
+
+0.1 (2008-04-16)
+================
+
+Initial release


Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/CHANGES.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Copied: zc.catalogqueue/branches/dev/src/zc/catalogqueue/CatalogEventQueue.py (from rev 85444, Products.QueueCatalog/trunk/CatalogEventQueue.py)
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/CatalogEventQueue.py	                        (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/CatalogEventQueue.py	2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,289 @@
+##############################################################################
+#
+# Copyright (c) 2002-2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+# 
+##############################################################################
+"""
+$Id$
+"""
+
+import logging
+
+from persistent import Persistent
+from ZODB.POSException import ConflictError
+
+logger = logging.getLogger(__name__)
+
+SAFE_POLICY = 0
+ALTERNATIVE_POLICY = 1
+
+REMOVED       = 0
+ADDED         = 1
+CHANGED       = 2
+CHANGED_ADDED = 3
+EVENT_TYPES = (REMOVED, CHANGED, ADDED, CHANGED_ADDED)
+antiEvent = {REMOVED:       ADDED,
+             ADDED:         REMOVED,
+             CHANGED:       CHANGED,
+             CHANGED_ADDED: CHANGED_ADDED,
+             }.get
+
+ADDED_EVENTS = (CHANGED, ADDED, CHANGED_ADDED)
+
+
+class CatalogEventQueue(Persistent):
+    """Event queue for catalog events
+
+    This is a rather odd queue. It organizes events by object, where
+    objects are identified by uids, which happen to be string paths.
+
+    One way that this queue is extremely odd is that it really only
+    keeps track of the last event for an object. This is because we
+    really only *care* about the last event for an object.
+
+    There are three types of events:
+
+    ADDED         -- An object was added to the catalog
+
+    CHANGED       -- An object was changed
+
+    REMOVED       -- An object was removed from the catalog
+
+    CHANGED_ADDED -- Add object was added and subsequently changed.
+                     This event is a consequence of the queue implementation.
+                     
+    Note that, although we only keep track of the most recent
+    event. there are rules for how the most recent event can be
+    updated:
+
+    - It is illegal to update an ADDED, CHANGED, or CHANGED_ADDED
+      event with an ADDED event or
+
+    - to update a REMOVED event with a CHANGED event.
+
+    We have a problem because applications don't really indicate
+    whether they are are adding, or just updating.  We deduce add
+    events by examining the catalog and event queue states.
+
+    Also note that, when events are applied to the catalog, events may
+    have no effect.
+
+    - If an object is in the catalog, ADDED events are equivalent to
+      CHANGED events.
+
+    - If an object is not in the catalog, REMOVED and CHANGED events
+      have no effect.
+
+    If we undo a transaction, we generate an anti-event. The anti
+    event of ADDED id REMOVED, of REMOVED is ADDED, and of CHANGED is
+    CHANGED. 
+
+    Note that these rules represent heuristics that attempt to provide
+    efficient and sensible behavior for most cases. They are not "correct" in
+    that they handle cases that may not seem handleable. For example,
+    consider a sequence of transactions:
+
+      T1 adds an object
+      T2 removes the object
+      T3 adds the object
+      T4 processes the queue
+      T5 undoes T1
+
+    It's not clear what should be done in this case? We decide to
+    generate a remove event, even though a later transaction added the
+    object again. Is this correct? It's hard to say. The decision we
+    make is not horrible and it allows us to provide a very efficient
+    implementation.  See the unit tests for other scenarios. Feel
+    free to think of cases for which our decisions are unacceptably
+    wrong and write unit tests for these cases.
+
+    There are two kinds of transactions that affect the queue:
+
+    - Application transactions always add or modify events. They never
+      remove events.
+
+    - Queue processing transactions always remove events.
+    
+    """
+
+    _conflict_policy = SAFE_POLICY
+
+    def __init__(self, conflict_policy=SAFE_POLICY):
+
+        # Mapping from uid -> (generation, event type)
+        self._data = {}
+        self._conflict_policy = conflict_policy
+
+    def __nonzero__(self):
+        return not not self._data
+
+    def __len__(self):
+        return len(self._data)
+        
+    def update(self, uid, etype):
+        assert etype in EVENT_TYPES
+        data = self._data
+        current = data.get(uid)
+        if current is not None:
+            generation, current = current
+            if current in ADDED_EVENTS and etype is ADDED:
+                raise TypeError("Attempt to add an object that is already "
+                                "in the catalog")
+            if current is REMOVED and etype is CHANGED:
+                raise TypeError("Attempt to change an object that has "
+                                "been removed")
+
+            if ((current is ADDED or current is CHANGED_ADDED)
+                and etype is CHANGED):
+                etype = CHANGED_ADDED
+                
+        else:
+            generation = 0
+
+        data[uid] = generation+1, etype
+
+        self._p_changed = 1
+
+    def getEvent(self, uid):
+        state = self._data.get(uid)
+        if state is not None:
+            state = state[1]
+        return state
+
+    def process(self, limit=None):
+        """Removes and returns events from this queue.
+
+        If limit is specified, at most (limit) events are removed.
+        """
+        data = self._data
+        if not limit or len(data) <= limit:
+            self._data = {}
+            return data
+        else:
+            self._p_changed = 1
+            res = {}
+            keys = data.keys()[:limit]
+            for key in keys:
+                res[key] = data[key]
+                del data[key]
+            return res
+
+    def _p_resolveConflict(self, oldstate, committed, newstate):
+        # Apply the changes made in going from old to newstate to
+        # committed
+
+        # Note that in the case of undo, the olddata is the data for
+        # the transaction being undone and newdata is the data for the
+        # transaction previous to the undone transaction.
+
+        # Find the conflict policy on the new state to make sure changes
+        # to it will be applied
+        policy = newstate['_conflict_policy']
+
+        # Committed is always the currently committed data.
+        oldstate_data  =  oldstate['_data']
+        committed_data = committed['_data']
+        newstate_data  =  newstate['_data']
+
+        # Merge newstate changes into committed
+        for uid, new in newstate_data.items():
+
+            # Decide if this is a change
+            old = oldstate_data.get(uid)
+            current = committed_data.get(uid)
+            
+            if new != old:
+                # something changed
+
+                if old is not None:
+                    # got a repeat event
+                    if new[0] < old[0]:
+                        # This was an undo, so give the event the undo
+                        # time and convert to an anti event of the old
+                        # (undone) event. 
+                        new = (0, antiEvent(old[1]))
+                    elif new[1] is ADDED:
+                        if policy == SAFE_POLICY:
+                            logger.error('Queue conflict on %s: ADDED on existing item' % uid)
+                            raise ConflictError
+                        else:
+                            if current and current[1] == REMOVED:
+                                new = current
+                            else:
+                                new = (current[0]+1, CHANGED_ADDED)
+                            
+
+                    # remove this event from old, so that we don't
+                    # mess with it later.
+                    del oldstate_data[uid]
+
+                # Check aqainst current value. Either we want a
+                # different event, in which case we give up, or we
+                # do nothing.
+                if current is not None:
+                    if current[1] != new[1]:
+                        if policy == SAFE_POLICY:
+                            # This is too complicated, bail
+                            logger.error('Queue conflict on %s' % uid)
+                            raise ConflictError
+                        elif REMOVED not in (new[1], current[1]):
+                            new = (current[0]+1, CHANGED_ADDED)
+                            committed_data[uid] = new
+                        elif ( current[0] < new[0] and
+                               new[1] == REMOVED ):
+                            committed_data[uid] = new
+
+                        # remove this event from old, so that we don't
+                        # mess with it later.
+                        if oldstate_data.get(uid) is not None:
+                            del oldstate_data[uid]
+
+                    # nothing to do
+                    continue
+
+                committed_data[uid] = new
+
+        # Now handle remaining events in old that weren't in new.
+        # These *must* be undone events!
+        for uid, old in oldstate_data.items():
+            new = (0, antiEvent(old[1]))
+            
+            # See above
+            current = committed_data.get(uid)
+            if current is not None:
+                if current[1] != new[1]:
+                    # This is too complicated, bail
+                    logger.error('Queue conflict on %s processing undos' % uid)
+                    raise ConflictError
+                # nothing to do
+                continue
+
+            committed_data[uid] = new
+
+        return { '_data': committed_data
+               , '_conflict_policy' : policy
+               }
+
+__doc__ = CatalogEventQueue.__doc__ + __doc__
+
+
+
+# Old worries
+
+# We have a problem. We have to make sure that we don't lose too
+# much history to undo, but we don't want to retain the entire
+# history. We certainly don't want to execute the entire history
+# when we execute a trans.
+#
+# Baah, no worry, if we undo in a series of unprocessed events, we
+# simply restore the old event, which we have in the old state.
+
+

Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/README.txt
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/README.txt	                        (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/README.txt	2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,15 @@
+*************
+Catalog Queue
+*************
+
+A catalog queue provides a queue for catalog indexing. The basic idea
+is to queue catalog operations so:
+
+- Operations can be batched for greater efficiency
+
+- Application requests don't have to wait for indexing to be done
+
+The benefits of queueing are especially significant when text indexes
+are used.
+
+.. contents::


Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/README.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/__init__.py
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/__init__.py	                        (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/__init__.py	2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1 @@
+#


Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/__init__.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/interfaces.py
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/interfaces.py	                        (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/interfaces.py	2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,39 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import zope.interface
+
+class ICatalogQueue(zope.interface.Interface):
+
+    def add(id):
+        """Add the object with the given id to the catalog
+        """
+
+    def update(id):
+        """Update the object with the given id in the catalog
+        """
+
+    def remove(id):
+        """Remove the object with the given id in the catalog
+        """
+
+    def process(ids, catalogs, limit):
+        """Process up to limit objects, returning the number processed
+
+        The first argument is an object with a getObject(id) method.
+
+        Catalogs is a multi-iterable collection of
+        zope.index.interfaces.IInjection objects to be updated.
+        """
+        


Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/interfaces.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.py
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.py	                        (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.py	2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,68 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from zc.catalogqueue.CatalogEventQueue import REMOVED, CHANGED, ADDED
+
+import logging
+import persistent
+import zc.catalogqueue.CatalogEventQueue
+import zc.catalogqueue.interfaces
+import zope.interface
+
+logger = logging.getLogger(__name__)
+
+class CatalogQueue(persistent.Persistent):
+
+    zope.interface.implements(zc.catalogqueue.interfaces.ICatalogQueue)
+
+    _buckets = 1009 # Maybe configurable someday
+
+    def __init__(self):
+        self._queues = [
+            zc.catalogqueue.CatalogEventQueue.CatalogEventQueue()
+            for i in range(self._buckets)
+            ]
+
+    def _notify(self, id, event):
+        self._queues[hash(id) % self._buckets].update(id, event)
+
+    def add(self, id):
+        self._notify(id, ADDED)
+
+    def update(self, id):
+        self._notify(id, CHANGED)
+
+    def remove(self, id):
+        self._notify(id, REMOVED)
+
+    def process(self, ids, catalogs, limit):
+        done = 0
+        for queue in self._queues:
+            for id, (_, event) in queue.process(limit-done).iteritems():
+                if event is REMOVED:
+                    for catalog in catalogs:
+                        catalog.unindex_doc(id)
+                else:
+                    ob = ids.queryObject(id)
+                    if ob is None:
+                        logger.warn("Couldn't find object for %s", id)
+                    else:
+                        for catalog in catalogs:
+                            catalog.index_doc(id, ob)
+                done += 1
+
+            if done >= limit:
+                break
+
+        return done


Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.txt
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.txt	                        (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.txt	2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,129 @@
+Using Queues
+============
+
+A queue is created by instantiating a
+zc.catalogqueue.queue.CatalogQueue object:
+
+    >>> import zc.catalogqueue.queue
+    >>> queue = zc.catalogqueue.queue.CatalogQueue()
+
+Typically, queues are registered as
+zc.catalogqueue.interfaces.ICatalogQueue utilities.
+
+    >>> import zope.interface, pprint
+    >>> pprint.pprint(sorted(zope.interface.providedBy(queue)), width=1)
+    [<InterfaceClass zc.catalogqueue.interfaces.ICatalogQueue>,
+     <InterfaceClass persistent.interfaces.IPersistent>]
+
+Queues are used in 2 ways.  As content are modified, we call add,
+update, and remove methods on the queue:
+
+    >>> queue.add(1)
+    >>> queue.update(1)
+    >>> queue.remove(1)
+
+    >>> queue.update(2)
+    >>> queue.update(2)
+
+    >>> queue.add(3)
+    >>> queue.update(3)
+    >>> queue.add(3)
+    Traceback (most recent call last):
+    ...
+    TypeError: Attempt to add an object that is already in the catalog
+
+    >>> queue.update(4)
+    >>> queue.update(4)
+    >>> queue.update(4)
+
+    >>> queue.remove(5)
+    >>> queue.update(5)
+    Traceback (most recent call last):
+    ...
+    TypeError: Attempt to change an object that has been removed
+
+    >>> queue.update(0)
+    >>> queue.update(0)
+
+Periodically, we call process on the queue.  We need to pass an ids
+object and a collection of injection (catalog) objects:
+
+    >>> class Ids:
+    ...     def queryObject(self, id, default=None):
+    ...         if not id:
+    ...             return default
+    ...         return "object %s" % id
+
+    >>> class Injection:
+    ...     def __init__(self, name):
+    ...         self.name = name
+    ...     def index_doc(self, docid, value):
+    ...         print self.name, 'indexing', docid, value
+    ...     def unindex_doc(self, docid):
+    ...         print self.name, 'unindexing', docid
+
+    >>> queue.process(Ids(), [Injection('cat1'), Injection('cat2')], 10)
+    cat1 unindexing 1
+    cat2 unindexing 1
+    cat1 indexing 2 object 2
+    cat2 indexing 2 object 2
+    cat1 indexing 3 object 3
+    cat2 indexing 3 object 3
+    cat1 indexing 4 object 4
+    cat2 indexing 4 object 4
+    cat1 unindexing 5
+    cat2 unindexing 5
+    6
+
+There are a number of things to note about this example:
+
+- Each object is processed only once.
+
+- What happens depends on th elast event.
+
+- Object 0 wasn't indexed because queryObject returned None.  We
+  ignore events for objects that have been removed from the intid
+  utility.
+
+- The number of objects processed is returned.
+
+If we process the queue without additional events, we'll just get 0
+back:
+
+    >>> queue.process(Ids(), [Injection('cat1'), Injection('cat2')], 10)
+    0
+
+Of course, the limit argument limites how many events we process:
+
+    >>> for i in range(10):
+    ...     queue.update(i)
+    >>> queue.process(Ids(), [Injection('cat1')], 5)
+    cat1 indexing 1 object 1
+    cat1 indexing 2 object 2
+    cat1 indexing 3 object 3
+    cat1 indexing 4 object 4
+    5
+
+    >>> queue.process(Ids(), [Injection('cat1')], 5)
+    cat1 indexing 5 object 5
+    cat1 indexing 6 object 6
+    cat1 indexing 7 object 7
+    cat1 indexing 8 object 8
+    cat1 indexing 9 object 9
+    5
+
+(Remember that 0 isn't processed because it can't be found.)
+
+When an object can't be found, a warning is logged:
+
+    >>> import zope.testing.loggingsupport
+    >>> handler = zope.testing.loggingsupport.InstalledHandler('zc')
+    >>> queue.update(0)
+    >>> queue.process(Ids(), [Injection('cat1')], 5)
+    1
+
+    >>> print handler
+    zc.catalogqueue.queue WARNING
+      Couldn't find object for 0
+
+    >>> handler.uninstall()


Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/queue.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/__init__.py
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/__init__.py	                        (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/__init__.py	2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1 @@
+#


Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/__init__.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Copied: zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_CatalogEventQueue.py (from rev 85444, Products.QueueCatalog/trunk/tests/test_CatalogEventQueue.py)
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_CatalogEventQueue.py	                        (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_CatalogEventQueue.py	2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,321 @@
+##############################################################################
+#
+# Copyright (c) 2002-2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+# 
+##############################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import transaction
+
+from zc.catalogqueue.CatalogEventQueue import CatalogEventQueue
+from zc.catalogqueue.CatalogEventQueue import ADDED
+from zc.catalogqueue.CatalogEventQueue import CHANGED
+from zc.catalogqueue.CatalogEventQueue import CHANGED_ADDED
+from zc.catalogqueue.CatalogEventQueue import REMOVED
+from zc.catalogqueue.CatalogEventQueue import SAFE_POLICY
+from zc.catalogqueue.CatalogEventQueue import ALTERNATIVE_POLICY
+from ZODB.POSException import ConflictError 
+
+
+class QueueConflictTests(unittest.TestCase):
+
+    def _setAlternativePolicy(self):
+        # Apply the alternative conflict resolution policy
+        self.queue._conflict_policy = ALTERNATIVE_POLICY
+        self.queue._p_jar.transaction_manager.commit()
+        self.queue2._p_jar.sync()
+
+        self.assertEquals(self.queue._conflict_policy, ALTERNATIVE_POLICY)
+        self.assertEquals(self.queue2._conflict_policy, ALTERNATIVE_POLICY)
+ 
+
+    def _insane_update(self, queue, uid, etype):
+        # Queue update method that allows insane state changes, needed
+        # to provoke pathological queue states
+        data = queue._data
+        current = data.get(uid)
+        if current is not None:
+            generation, current = current
+
+            if ((current is ADDED or current is CHANGED_ADDED)
+                and etype is CHANGED):
+                etype = CHANGED_ADDED
+        else:
+            generation = 0
+
+        data[uid] = generation+1, etype
+
+        queue._p_changed = 1
+
+    def openDB(self):
+        from ZODB.FileStorage import FileStorage
+        from ZODB.DB import DB
+        self.dir = tempfile.mkdtemp()
+        self.storage = FileStorage(os.path.join(self.dir, 'testQCConflicts.fs'))
+        self.db = DB(self.storage)
+
+    def setUp(self):
+        self.openDB()
+        queue = CatalogEventQueue()
+
+        tm1 = transaction.TransactionManager()
+        self.conn1 = self.db.open(transaction_manager=tm1)
+        r1 = self.conn1.root()
+        r1["queue"] = queue
+        del queue
+        self.queue = r1["queue"]
+        tm1.commit()
+
+        tm2 = transaction.TransactionManager()
+        self.conn2 = self.db.open(transaction_manager=tm2)
+        r2 = self.conn2.root()
+        self.queue2 = r2["queue"]
+        ignored = dir(self.queue2)    # unghostify
+
+    def tearDown(self):
+        transaction.abort()
+        del self.queue
+        del self.queue2
+        if self.storage is not None:
+            self.storage.close()
+            self.storage.cleanup()
+            shutil.rmtree(self.dir)
+
+    def test_rig(self):
+        # Test the test rig
+        self.assertEqual(self.queue._p_serial, self.queue2._p_serial)
+
+    def test_simpleConflict(self):
+        # Using the first connection, index 10 paths
+        for n in range(10):
+            self.queue.update('/f%i' % n, ADDED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        # After this run, the first connection's queuecatalog has 10
+        # entries, the second has none.
+        self.assertEqual(len(self.queue), 10)
+        self.assertEqual(len(self.queue2), 0)
+
+        # Using the second connection, index the other 10 folders
+        for n in range(10):
+            self.queue2.update('/g%i' % n, ADDED)
+
+        # Now both connections' queuecatalogs have 10 entries each, but
+        # for differrent objects
+        self.assertEqual(len(self.queue), 10)
+        self.assertEqual(len(self.queue2), 10)
+
+        # Now we commit. Conflict resolution on the catalog queue should
+        # kick in because both connections have changes. Since none of the
+        # events collide, we should end up with 20 entries in our catalogs.
+        self.queue2._p_jar.transaction_manager.commit()
+        self.queue._p_jar.sync()
+        self.queue2._p_jar.sync()
+        self.assertEqual(len(self.queue), 20)
+        self.assertEqual(len(self.queue2), 20)
+
+    def test_unresolved_add_after_something(self):
+        # If an  event is encountered for an object and we are trying to
+        # commit an ADDED event, a conflict is encountered
+
+        self.queue.update('/f0', ADDED)
+        self.queue.update('/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        self.queue2.update('/f0', ADDED)
+        self.queue2.update('/f0', CHANGED)
+        self.queue2._p_jar.transaction_manager.commit()
+
+        self._insane_update(self.queue, '/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        self._insane_update(self.queue2, '/f0', ADDED)
+        self.assertRaises( ConflictError
+                         , self.queue2._p_jar.transaction_manager.commit
+                         )
+
+    def test_resolved_add_after_nonremoval(self):
+        # If an  event is encountered for an object and we are trying to
+        # commit an ADDED event while the conflict resolution policy is
+        # NOT the SAFE_POLICY, we won't get a conflict.
+        self._setAlternativePolicy()
+        
+        self.queue.update('/f0', ADDED)
+        self.queue.update('/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        self.queue2.update('/f0', ADDED)
+        self.queue2.update('/f0', CHANGED)
+        self.queue2._p_jar.transaction_manager.commit()
+
+        self._insane_update(self.queue, '/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        # If we had a conflict, this would blow up
+        self._insane_update(self.queue2, '/f0', ADDED)
+        self.queue2._p_jar.transaction_manager.commit()
+
+        # After the conflict has been resolved, we expect the queues to
+        # containa a CHANGED_ADDED event.
+        self.queue._p_jar.sync()
+        self.queue2._p_jar.sync()
+        self.assertEquals(len(self.queue), 1)
+        self.assertEquals(len(self.queue2), 1)
+        event1 = self.queue.getEvent('/f0')
+        event2 = self.queue2.getEvent('/f0')
+        self.failUnless(event1 == event2 == CHANGED_ADDED)
+
+    def test_resolved_add_after_removal(self):
+        # If a REMOVED event is encountered for an object and we are trying to
+        # commit an ADDED event while the conflict resolution policy is
+        # NOT the SAFE_POLICY, we won't get a conflict.
+        self._setAlternativePolicy()
+        
+        self.queue.update('/f0', ADDED)
+        self.queue.update('/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        self.queue2.update('/f0', ADDED)
+        self.queue2.update('/f0', CHANGED)
+        self.queue2._p_jar.transaction_manager.commit()
+
+        self.queue.update('/f0', REMOVED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        # If we had a conflict, this would blow up
+        self._insane_update(self.queue2, '/f0', ADDED)
+        self.queue2._p_jar.transaction_manager.commit()
+
+        # After the conflict has been resolved, we expect the queue to
+        # contain a REMOVED event.
+        self.queue._p_jar.sync()
+        self.queue2._p_jar.sync()
+        self.assertEquals(len(self.queue), 1)
+        self.assertEquals(len(self.queue2), 1)
+        event1 = self.queue.getEvent('/f0')
+        event2 = self.queue2.getEvent('/f0')
+        self.failUnless(event1 == event2 == REMOVED)
+
+    def test_unresolved_new_old_current_all_different(self):
+        # If the events we get from the current, new and old states are
+        # all different, we throw in the towel in the form of a conflict.
+        # This test relies on the fact that no OLD state is de-facto treated
+        # as a state.
+
+
+        self.queue.update('/f0', ADDED)
+        self.queue.update('/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        # This commit should now raise a conflict
+        self._insane_update(self.queue2, '/f0', REMOVED)
+        self.assertRaises( ConflictError
+                         , self.queue2._p_jar.transaction_manager.commit
+                         )
+
+
+    def test_resolved_new_old_current_all_different(self):
+        # If the events we get from the current, new and old states are
+        # all different and the SAFE_POLICY conflict resolution policy is 
+        # not enforced, the conflict resolves without bloodshed.
+        # This test relies on the fact that no OLD state is de-facto treated
+        # as a state.
+        self._setAlternativePolicy()
+ 
+        self.queue.update('/f0', ADDED)
+        self.queue.update('/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        # This commit should not raise a conflict
+        self._insane_update(self.queue2, '/f0', REMOVED)
+        self.queue2._p_jar.transaction_manager.commit()
+
+        # In this scenario (the incoming new state has a REMOVED event), 
+        # the new state is disregarded and the old state is used. We are 
+        # left with a CHANGED_ADDED event. (see queue.update method; ADDED
+        # plus CHANGED results in CHANGED_ADDED)
+        self.queue._p_jar.sync()
+        self.queue2._p_jar.sync()
+        self.assertEquals(len(self.queue), 1)
+        self.assertEquals(len(self.queue2), 1)
+        event1 = self.queue.getEvent('/f0')
+        event2 = self.queue2.getEvent('/f0')
+        self.failUnless(event1 == event2 == CHANGED_ADDED)
+
+    def test_unresolved_new_old_current_all_different_2(self):
+        # If the events we get from the current, new and old states are
+        # all different, we throw in the towel in the form of a conflict.
+        # This test relies on the fact that no OLD state is de-facto treated
+        # as a state.
+
+
+        self.queue.update('/f0', ADDED)
+        self.queue.update('/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        self.queue2.update('/f0', ADDED)
+        self.queue2.update('/f0', CHANGED)
+        self.queue2._p_jar.transaction_manager.commit()
+
+        self.queue.update('/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        # This commit should now raise a conflict
+        self._insane_update(self.queue2, '/f0', REMOVED)
+        self.assertRaises( ConflictError
+                         , self.queue2._p_jar.transaction_manager.commit
+                         )
+
+    def test_resolved_new_old_current_all_different_2(self):
+        # If the events we get from the current, new and old states are
+        # all different and the SAFE_POLICY conflict resolution policy is 
+        # not enforced, the conflict resolves without bloodshed.
+        self._setAlternativePolicy()
+ 
+        self.queue.update('/f0', ADDED)
+        self.queue.update('/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        self.queue2.update('/f0', ADDED)
+        self.queue2.update('/f0', CHANGED)
+        self.queue2._p_jar.transaction_manager.commit()
+
+        self.queue.update('/f0', CHANGED)
+        self.queue._p_jar.transaction_manager.commit()
+
+        # This commit should not raise a conflict
+        self._insane_update(self.queue2, '/f0', REMOVED)
+        self.queue2._p_jar.transaction_manager.commit()
+
+        # In this scenario (the incoming new state has a REMOVED event), 
+        # we will take the new state to resolve the conflict, because its
+        # generation number is higher then the oldstate and current state.
+        self.queue._p_jar.sync()
+        self.queue2._p_jar.sync()
+        self.assertEquals(len(self.queue), 1)
+        self.assertEquals(len(self.queue2), 1)
+        event1 = self.queue.getEvent('/f0')
+        event2 = self.queue2.getEvent('/f0')
+        self.failUnless(event1 == event2 == REMOVED)
+
+
+def test_suite():
+    return unittest.TestSuite((
+            unittest.makeSuite(QueueConflictTests),
+                    ))
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')
+

Added: zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_queue.py
===================================================================
--- zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_queue.py	                        (rev 0)
+++ zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_queue.py	2008-04-16 22:57:30 UTC (rev 85445)
@@ -0,0 +1,25 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import unittest
+from zope.testing import doctest
+
+
+def test_suite():
+    return unittest.TestSuite((
+        doctest.DocFileSuite('../queue.txt'),
+        ))
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')
+


Property changes on: zc.catalogqueue/branches/dev/src/zc/catalogqueue/tests/test_queue.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native



More information about the Checkins mailing list