[Checkins] SVN: z3c.indexing.dispatch/ Initial import.

Malthe Borch mborch at gmail.com
Sat Mar 29 07:16:30 EDT 2008


Log message for revision 85009:
  Initial import.

Changed:
  A   z3c.indexing.dispatch/
  A   z3c.indexing.dispatch/trunk/
  A   z3c.indexing.dispatch/trunk/AUTHOR.txt
  A   z3c.indexing.dispatch/trunk/README.txt
  A   z3c.indexing.dispatch/trunk/bootstrap.py
  A   z3c.indexing.dispatch/trunk/buildout.cfg
  A   z3c.indexing.dispatch/trunk/setup.py
  A   z3c.indexing.dispatch/trunk/src/
  A   z3c.indexing.dispatch/trunk/src/z3c/
  A   z3c.indexing.dispatch/trunk/src/z3c/__init__.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/__init__.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/constants.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/operation.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/reducer.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/__init__.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_queue.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/utils.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py

-=-
Added: z3c.indexing.dispatch/trunk/AUTHOR.txt
===================================================================
--- z3c.indexing.dispatch/trunk/AUTHOR.txt	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/AUTHOR.txt	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,7 @@
+Authors
+=======
+
+Enfold Systems, Helge Tesdal, Andreas Zeidler, Malthe Borch
+
+Most of the code stems from other packages, and was only reshaped to
+fit into the ``z3c.indexing``-umbrella.

Added: z3c.indexing.dispatch/trunk/README.txt
===================================================================
--- z3c.indexing.dispatch/trunk/README.txt	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/README.txt	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,4 @@
+Overview
+========
+
+This package implements a transaction-safe indexing dispatcher.

Added: z3c.indexing.dispatch/trunk/bootstrap.py
===================================================================
--- z3c.indexing.dispatch/trunk/bootstrap.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/bootstrap.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,52 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Bootstrap a buildout-based project
+
+Simply run this script in a directory containing a buildout.cfg.
+The script accepts buildout command-line options, so you can
+use the -c option to specify an alternate configuration file.
+
+$Id: bootstrap.py 71627 2006-12-20 16:46:11Z jim $
+"""
+
+import os, shutil, sys, tempfile, urllib2
+
+tmpeggs = tempfile.mkdtemp()
+
+ez = {}
+exec urllib2.urlopen('http://peak.telecommunity.com/dist/ez_setup.py'
+                     ).read() in ez
+ez['use_setuptools'](to_dir=tmpeggs, download_delay=0)
+
+import pkg_resources
+
+cmd = 'from setuptools.command.easy_install import main; main()'
+if sys.platform == 'win32':
+    cmd = '"%s"' % cmd # work around spawn lamosity on windows
+
+ws = pkg_resources.working_set
+assert os.spawnle(
+    os.P_WAIT, sys.executable, sys.executable,
+    '-c', cmd, '-mqNxd', tmpeggs, 'zc.buildout',
+    dict(os.environ,
+         PYTHONPATH=
+         ws.find(pkg_resources.Requirement.parse('setuptools')).location
+         ),
+    ) == 0
+
+ws.add_entry(tmpeggs)
+ws.require('zc.buildout')
+import zc.buildout.buildout
+zc.buildout.buildout.main(sys.argv[1:] + ['bootstrap'])
+shutil.rmtree(tmpeggs)

Added: z3c.indexing.dispatch/trunk/buildout.cfg
===================================================================
--- z3c.indexing.dispatch/trunk/buildout.cfg	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/buildout.cfg	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,9 @@
+[buildout]
+develop = .
+parts = test
+
+find-links = http://download.zope.org/distribution/
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = z3c.indexing.dispatch
\ No newline at end of file

Added: z3c.indexing.dispatch/trunk/setup.py
===================================================================
--- z3c.indexing.dispatch/trunk/setup.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/setup.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,38 @@
+from setuptools import setup, find_packages
+import sys, os
+
+version = '0.1'
+
+setup(name='z3c.indexing.dispatch',
+      version=version,
+      description="Transaction-safe indexing dispatcher.",
+      long_description=open('README.txt').read(),
+      classifiers=[
+        "Framework :: Plone",
+        "Framework :: Zope2",
+        "Framework :: Zope3",
+        "Programming Language :: Python",
+        "Topic :: Software Development :: Libraries :: Python Modules",
+        ],
+      keywords='',
+      author='Zope Corporation and Contributors',
+      author_email='zope3-dev at zope.org',
+      url='',
+      license='ZPL',
+      packages=find_packages('src'),
+      package_dir={'': 'src'},
+      namespace_packages=['z3c', 'z3c.indexing'],
+      include_package_data=True,
+      zip_safe=False,
+      install_requires=[
+          'setuptools',
+          'zope.interface',
+          'zope.component',
+          'zope.testing',
+          'transaction',
+          # -*- Extra requirements: -*-
+      ],
+      entry_points="""
+      # -*- Entry points: -*-
+      """,
+      )

Added: z3c.indexing.dispatch/trunk/src/z3c/__init__.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/__init__.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/__init__.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,6 @@
+# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
+try:
+    __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+    from pkgutil import extend_path
+    __path__ = extend_path(__path__, __name__)

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/__init__.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/__init__.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/__init__.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,6 @@
+# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
+try:
+    __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+    from pkgutil import extend_path
+    __path__ = extend_path(__path__, __name__)

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,27 @@
+z3c.indexing.dispatch
+=====================
+
+The indexing dispatcher is the main entry point for indexing content.
+
+A dispatcher must implement three basic operations (defined in the
+``IDispatcher`` interface): index, reindex and unindex.
+
+Dispatching flow
+----------------
+
+Dispatchers can perform indexing operations directly or defer work to
+other dispatchers using the following lookup pattern:
+
+  IDispatcher(self, obj) -> IDispatcher
+
+Example dispatching flows:
+
+  transactional dispatcher -> zcatalog
+  transactional dispatcher -> async -> xapian
+
+Transactional dispatching
+-------------------------
+
+The transactional dispatcher will queue indexing operations while
+waiting for the transaction boundary; then pass the operations on to
+the next set of dispatchers.

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1 @@
+#

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/constants.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/constants.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/constants.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,5 @@
+# constants for indexing operations
+UNINDEX = -1
+REINDEX = 0
+INDEX = 1
+

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,57 @@
+from zope.interface import Interface
+
+class IDispatcher(Interface):
+    """Interface for dispatching indexing operations.
+
+    Defines basic indexing operations corresponding to content being
+    added, modified or deleted.
+    """
+
+    def index(obj, attributes=None):
+        """Queue an index operation for the given object and attributes."""
+
+    def reindex(obj, attributes=None):
+        """Queue a reindex operation for the given object and attributes."""
+
+    def unindex(obj):
+        """Queue an unindex operation for the given object."""
+
+    def flush(obj):
+        """Flush queue."""
+        
+
+class ITransactionalDispatcher(IDispatcher):
+    """A transactional dispatcher will keep operations in a queue
+    until a transaction boundary."""
+    
+    def commit():
+        """Commit transaction."""
+
+    def clear():
+        """Clear internal state and release transaction manager."""
+
+    def getState():
+        """Return copy of queue state."""
+
+    def setState(state):
+        """Set queue state."""
+
+    
+class IQueueReducer(Interface):
+    """Operation queue optimization.
+
+    This component might be merged with the transactional dispatcher
+    at some point. The motivation for splitting this functionality out
+    seems to primarily be a matter of optional configuration.
+    """
+
+    def optimize(queue):
+        """Remove redundant entries from queue.
+
+        The provided ``queue`` should be a sequence of operations on
+        the form:
+
+           (operator, object, attributes)
+
+        An optimized sequence is returned.
+        """

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/operation.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/operation.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/operation.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,33 @@
+import constants
+
+class Operation(tuple):
+    """Represents an indexing operation."""
+
+    op = None
+
+    def __new__(cls, obj=None, attributes=None):
+        inst = tuple.__new__(cls, (cls.op, obj, attributes))
+        inst.obj = obj
+        inst.attributes = attributes
+        return inst
+
+    def process(self, dispatcher):
+        return NotImplemented("Should be implemented in subclass.")
+
+class Add(Operation):
+    op = constants.INDEX
+
+    def process(self, dispatcher):
+        dispatcher.index(self.obj, self.attributes)
+
+class Modify(Operation):
+    op = constants.REINDEX
+
+    def process(self, dispatcher):
+        dispatcher.reindex(self.obj, self.attributes)
+
+class Delete(Operation):
+    op = constants.UNINDEX
+
+    def process(self, dispatcher):
+        dispatcher.unindex(self.obj)

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,112 @@
+from zope import interface
+from zope import component
+
+from threading import local
+
+from z3c.indexing.dispatch.interfaces import IDispatcher
+from z3c.indexing.dispatch.interfaces import ITransactionalDispatcher
+from z3c.indexing.dispatch.interfaces import IQueueReducer
+
+from z3c.indexing.dispatch.constants import INDEX, REINDEX, UNINDEX
+from z3c.indexing.dispatch.transactions import QueueTM
+
+from z3c.indexing.dispatch import operation
+
+import transaction
+
+from logging import getLogger
+debug = getLogger('z3c.indexing.dispatch.queue').debug
+
+localQueue = None
+
+def getDispatcher():
+    """Return a (thread-local) dispatcher, creating one if necessary."""
+    
+    global localQueue
+    if localQueue is None:
+        localQueue = TransactionalDispatcher()
+    return localQueue
+
+
+class TransactionalDispatcher(local):
+    """An indexing queue."""
+
+    interface.implements(ITransactionalDispatcher)
+
+    tmhook = None
+    
+    def __init__(self):
+        self.queue = []
+    
+    def index(self, obj, attributes=None):
+        debug('adding index operation for %r', obj)
+        self.queue.append(operation.Add(obj, attributes))
+        self._hook()
+
+    def reindex(self, obj, attributes=None):
+        debug('adding reindex operation for %r', obj)
+        self.queue.append(operation.Modify(obj, attributes))
+        self._hook()
+
+    def unindex(self, obj):
+        debug('adding unindex operation for %r', obj)
+        self.queue.append(operation.Delete(obj))
+        self._hook()
+
+    def flush(self):
+        return self.commit()
+
+    def commit(self):
+        self._optimize()
+
+        dispatchers = set()
+
+        for op, obj, attributes in self.queue:            
+            for name, dispatcher in component.getAdapters((self, obj), IDispatcher):
+                if op == INDEX:
+                    dispatcher.index(obj, attributes)
+                elif op == REINDEX:
+                    dispatcher.reindex(obj, attributes)
+                elif op == UNINDEX:
+                    dispatcher.unindex(obj)
+                else:
+                    raise ValueError('Invalid queue operation code: %d' % op)
+
+                dispatchers.add(dispatcher)
+            
+        self.clear()
+
+        for dispatcher in dispatchers:
+            dispatcher.flush()
+            
+    def clear(self):
+        debug('clearing %d queue item(s)', len(self.queue))
+        del self.queue[:]
+        self.tmhook = None
+
+    def setState(self, state):
+        self.queue = state
+
+    def getState(self):
+        return list(self.queue)
+
+    def __len__(self):
+        return len(self.queue)
+
+    def _hook(self):
+        """Register a hook into the transaction machinery.
+
+        This is to make sure the queue's processing method gets called
+        back just before the transaction is about to be committed.
+        """
+
+        if self.tmhook is None:
+            self.tmhook = QueueTM(self).register
+            
+        self.tmhook()
+
+    def _optimize(self):
+        reducer = component.queryUtility(IQueueReducer)
+        if reducer is not None:
+            self.queue = reducer.optimize(self.queue)
+

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/reducer.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/reducer.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/reducer.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,38 @@
+from logging import getLogger
+from zope.interface import implements
+from z3c.indexing.dispatch.interfaces import IQueueReducer
+from z3c.indexing.dispatch.constants import INDEX, UNINDEX
+
+debug = getLogger('z3c.indexing.dispatch.reducer').debug
+
+class QueueReducer(object):
+    """Reduce a queue of index operations."""
+    
+    implements(IQueueReducer)
+
+    def optimize(self, queue):
+        res = {}
+        debug('start reducing %d item(s): %r', len(queue), queue)
+
+        for iop, obj, iattr in queue:
+            oid = hash(obj)
+            op, dummy, attr = res.get(oid, (0, obj, iattr))
+            # If we are going to delete an item that was added in this transaction, ignore it
+            if op == INDEX and iop == UNINDEX:
+                del res[oid]
+            else:
+                # Operators are -1, 0 or 1 which makes it safe to add them
+                op += iop
+                op = min(max(op,UNINDEX), INDEX) # operator always between -1 and 1
+
+                # Handle attributes, None means all fields, and takes presedence
+                if isinstance(attr, (tuple,list)) and isinstance(iattr, (tuple,list)):
+                    attr = tuple(set(attr + iattr))
+                else:
+                    attr = None
+
+                res[oid] = (op, obj, attr)
+
+        debug('finished reducing; %d item(s) in queue...', len(res))
+
+        return res.values()

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/__init__.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/__init__.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/__init__.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1 @@
+#

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_queue.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_queue.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_queue.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,269 @@
+from zope import interface
+from zope import component
+
+from unittest import TestSuite, makeSuite, main, TestCase
+from threading import Thread, currentThread
+
+from zope.interface import implements
+from zope.component import provideUtility, provideAdapter
+from zope.testing.cleanup import CleanUp
+
+from z3c.indexing.dispatch.interfaces import IDispatcher
+from z3c.indexing.dispatch.interfaces import ITransactionalDispatcher
+from z3c.indexing.dispatch.interfaces import IQueueReducer
+
+from z3c.indexing.dispatch.reducer import QueueReducer
+from z3c.indexing.dispatch.queue import TransactionalDispatcher
+from z3c.indexing.dispatch.queue import getDispatcher
+from z3c.indexing.dispatch.constants import INDEX, REINDEX, UNINDEX
+from z3c.indexing.dispatch.tests import utils
+
+class QueueTests(CleanUp, TestCase):
+    
+    def setUp(self):
+        self.dispatcher = TransactionalDispatcher()
+
+    def tearDown(self):
+        self.queues = {}
+        self.dispatcher.clear()
+        
+    def _provide_dispatcher(self, name=""):
+        factory = utils.MockDispatcherFactory()
+
+        provideAdapter(
+            factory,
+            (IDispatcher, str),
+            IDispatcher,
+            name=name)
+
+        return factory.queue
+    
+    def testInterface(self):
+        self.failUnless(ITransactionalDispatcher.providedBy(self.dispatcher))
+
+    def testQueueHook(self):
+        class CaptainHook:
+            def __init__(self):
+                self.hooked = 0
+            def __call__(self):
+                self.hooked += 1
+        hook = CaptainHook()
+        dispatcher = self.dispatcher
+        dispatcher.tmhook = hook
+        self.assertEqual(hook.hooked, 0)
+        dispatcher.index('foo')
+        dispatcher.reindex('foo')
+        dispatcher.reindex('bar')
+        self.assertEqual(len(dispatcher.getState()), 3)
+        self.assertEqual(hook.hooked, 3)
+        dispatcher.commit()
+        self.assertEqual(hook.hooked, 3)
+
+    def testQueueState(self):
+        dispatcher = self.dispatcher
+        dispatcher.index('foo')
+        self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None)])
+        state = dispatcher.getState()
+        dispatcher.reindex('bar')
+        self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None), (REINDEX, 'bar', None)])
+        dispatcher.setState(state)
+        self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None)])
+        dispatcher.commit()
+        self.assertEqual(len(dispatcher), 0)
+
+    def testDispatching(self):
+        self.dispatcher.index('foo')
+        queue = self._provide_dispatcher()
+        self.dispatcher.commit()
+        self.assertEqual(self.dispatcher.getState(), [])
+        self.assertEqual(queue, [(INDEX, 'foo', None), 'flush'])
+
+    def testMultipleDispatchers(self):
+        dispatcher = self.dispatcher
+
+        queue1 = self._provide_dispatcher(name='dispatcher1')
+        queue2 = self._provide_dispatcher(name='dispatcher2')
+        
+        dispatcher.index('foo')
+        dispatcher.commit()
+        
+        self.assertEqual(dispatcher.getState(), [])
+        self.assertEqual(queue1, [(INDEX, 'foo', None), 'flush'])
+        self.assertEqual(queue2, [(INDEX, 'foo', None), 'flush'])
+
+    def testQueueOperations(self):
+        dispatcher = self.dispatcher
+        
+        queue = self._provide_dispatcher()
+        
+        dispatcher.index('foo')
+        dispatcher.reindex('foo')
+        dispatcher.unindex('foo')
+
+        dispatcher.commit()
+
+        self.assertEqual(len(dispatcher), 0)
+        self.assertEqual(queue, [(INDEX, 'foo', None), (REINDEX, 'foo', None), (UNINDEX, 'foo', None), 'flush'])
+
+    def testFlush(self):
+        queue = self._provide_dispatcher()
+        
+        self.dispatcher.index('foo')
+        self.dispatcher.commit()
+        
+        self.failUnless('flush' in queue)
+
+    def testQueueReducer(self):
+        class MessyReducer(object):
+            implements(IQueueReducer)
+            def optimize(self, queue):
+                return [ op for op in queue if not op[0] == UNINDEX ]
+        dispatcher = self.dispatcher
+        dispatcher.index('foo')
+        dispatcher.reindex('foo')
+        dispatcher.unindex('foo')
+        dispatcher.index('foo', 'bar')
+        dispatcher._optimize()
+        self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None), (REINDEX, 'foo', None), (UNINDEX, 'foo', None), (INDEX, 'foo', 'bar')])
+        provideUtility(MessyReducer())  # hook up the reducer
+        dispatcher._optimize()                # and try again...
+        self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None), (REINDEX, 'foo', None), (INDEX, 'foo', 'bar')])
+
+    def testRealQueueReducer(self):
+        provideUtility(QueueReducer())
+        dispatcher = self.dispatcher
+        dispatcher.index('foo')
+        dispatcher.reindex('foo')
+        dispatcher.unindex('foo')
+        dispatcher.index('foo', 'bar')
+        dispatcher._optimize()
+        self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None)])
+
+
+class QueueReducerTests(TestCase):
+
+    def testReduceQueue(self):
+        reducer = QueueReducer()
+
+        queue = [(REINDEX, 'A', None), (REINDEX, 'A', None)]
+        self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', None)])
+
+        queue = [(INDEX, 'A', None), (REINDEX, 'A', None)]
+        self.failUnlessEqual(reducer.optimize(queue), [(INDEX, 'A', None)])
+
+        queue = [(INDEX, 'A', None), (UNINDEX, 'A', None)]
+        self.failUnlessEqual(reducer.optimize(queue), [])
+
+        queue = [(UNINDEX, 'A', None), (INDEX, 'A', None)]
+        self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', None)])
+
+    def testReduceQueueWithAttributes(self):
+        reducer = QueueReducer()
+
+        queue = [(REINDEX, 'A', None), (REINDEX, 'A', ('a','b'))]
+        self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', None)])
+
+        queue = [(REINDEX, 'A', ('a','b')), (REINDEX, 'A', None)]
+        self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', None)])
+
+        queue = [(REINDEX, 'A', ('a','b')), (REINDEX, 'A', ('b','c'))]
+        self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', ('a', 'c', 'b'))])
+
+        queue = [(INDEX, 'A', None), (REINDEX, 'A', None)]
+        self.failUnlessEqual(reducer.optimize(queue), [(INDEX, 'A', None)])
+
+        queue = [(REINDEX, 'A', ('a','b')), (UNINDEX, 'A', None), (INDEX, 'A', None)]
+        self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', None)])
+
+
+class QueueThreadTests(TestCase):
+    """ thread tests modeled after zope.thread doctests """
+
+    def setUp(self):
+        self.dispatcher = getDispatcher()
+
+    def tearDown(self):
+        self.dispatcher.clear()
+
+    def testLocalQueues(self):
+        me = self.dispatcher                    # get the queued indexer...
+        other = []
+        def runner():                   # and a callable for the thread to run...
+            me.reindex('bar')
+            other[:] = me.getState()
+        thread = Thread(target=runner)  # another thread is created...
+        thread.start()                  # and started...
+        while thread.isAlive(): '...'   # wait until it's done...
+        self.assertEqual(other, [(REINDEX, 'bar', None)])
+        self.assertEqual(me.getState(), [])
+        me.index('foo')                 # something happening on our side...
+        self.assertEqual(other, [(REINDEX, 'bar', None)])
+        self.assertEqual(me.getState(), [(INDEX, 'foo', None)])
+        thread.join()                   # finally the threads are re-united...
+
+    def testQueuesOnTwoThreads(self):
+        me = self.dispatcher                    # get the queued indexer...
+        first = []
+        def runner1():                  # and callables for the first...
+            me.index('foo')
+            first[:] = me.getState()
+        thread1 = Thread(target=runner1)
+        second = []
+        def runner2():                  # and second thread
+            me.index('bar')
+            second[:] = me.getState()
+        thread2 = Thread(target=runner2)
+        self.assertEqual(first,  [])    # clean table before we start...
+        self.assertEqual(second, [])
+        self.assertEqual(me.getState(), [])
+        thread1.start()                 # do stuff here...
+        self.assertEqual(first,  [(INDEX, 'foo', None)])
+        self.assertEqual(second, [])
+        self.assertEqual(me.getState(), [])
+        thread2.start()                 # and there...
+        self.assertEqual(first,  [(INDEX, 'foo', None)])
+        self.assertEqual(second, [(INDEX, 'bar', None)])
+        self.assertEqual(me.getState(), [])
+        thread1.join()                  # re-unite with first thread and...
+        me.unindex('f00')               # let something happening on our side
+        self.assertEqual(first,  [(INDEX, 'foo', None)])
+        self.assertEqual(second, [(INDEX, 'bar', None)])
+        self.assertEqual(me.getState(), [(UNINDEX, 'f00', None)])
+        thread2.join()                  # also re-unite the second and...
+        me.unindex('f00')               # let something happening again...
+        self.assertEqual(first,  [(INDEX, 'foo', None)])
+        self.assertEqual(second, [(INDEX, 'bar', None)])
+        self.assertEqual(me.getState(), [(UNINDEX, 'f00', None), (UNINDEX, 'f00', None)])
+
+    def testManyThreads(self):
+        me = self.dispatcher                  # get the queued indexer...
+        queues = {}                     # container for local queues
+        def makeRunner(name, idx):
+            def runner():
+                for n in range(idx):    # index idx times
+                    me.index(name)
+                queues[currentThread()] = me.queue
+            return runner
+        threads = []
+        for idx in range(99):
+            threads.append(Thread(target=makeRunner('t%d' % idx, idx)))
+        for thread in threads:
+            thread.start()
+        for thread in threads:
+            thread.join()
+        for idx, thread in enumerate(threads):
+            tid = 't%d' % idx
+            queue = queues[thread]
+            names = [ name for op, name, attrs in queue ]
+            self.assertEquals(names, [tid] * idx)
+
+
+def test_suite():
+    return TestSuite([
+        makeSuite(QueueTests),
+        makeSuite(QueueReducerTests),
+        makeSuite(QueueThreadTests),
+    ])
+
+if __name__ == '__main__':
+    main(defaultTest='test_suite')

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,53 @@
+from unittest import TestCase, TestSuite, makeSuite, main
+from transaction import savepoint, commit, abort
+
+from z3c.indexing.dispatch.transactions import QueueTM
+from z3c.indexing.dispatch.constants import INDEX, REINDEX
+from z3c.indexing.dispatch.tests import utils
+
+class QueueTransactionManagerTests(TestCase):
+
+    def setUp(self):
+        self.queue = utils.MockTransactionalDispatcher()
+        self.tman = QueueTM(self.queue)
+        self.queue._hook = self.tman.register    # set up the transaction manager hook
+
+    def testFlushQueueOnCommit(self):
+        self.queue.index('foo')
+        commit()
+        self.assertEqual(self.queue.getState(), [])
+        self.assertEqual(self.queue.processed, [(INDEX, 'foo', None)])
+
+    def testFlushQueueOnAbort(self):
+        self.queue.index('foo')
+        abort()
+        self.assertEqual(self.queue.getState(), [])
+        self.assertEqual(self.queue.processed, None)
+
+    def testUseSavePoint(self):
+        self.queue.index('foo')
+        savepoint()
+        self.queue.reindex('bar')
+        commit()
+        self.assertEqual(self.queue.getState(), [])
+        self.assertEqual(self.queue.processed, [(INDEX, 'foo', None), (REINDEX, 'bar', None)])
+
+    def testRollbackSavePoint(self):
+        self.queue.index('foo')
+        sp = savepoint()
+        self.queue.reindex('bar')
+        sp.rollback()
+        commit()
+        self.assertEqual(self.queue.getState(), [])
+        self.assertEqual(self.queue.processed, [(INDEX, 'foo', None)])
+
+
+def test_suite():
+    return TestSuite([
+        makeSuite(QueueTransactionManagerTests),
+    ])
+
+if __name__ == '__main__':
+    main(defaultTest='test_suite')
+
+

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/utils.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/utils.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/utils.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,68 @@
+from zope import interface
+
+from z3c.indexing.dispatch.interfaces import IDispatcher, ITransactionalDispatcher
+from z3c.indexing.dispatch.constants import INDEX, REINDEX, UNINDEX
+
+class MockDispatcherFactory(object):
+    def __init__(self):
+        self.queue = []
+        self.dispatcher = MockDispatcher()
+        
+    def __call__(self, *args):
+        self.dispatcher.queue = self.queue
+        return self.dispatcher
+        
+class MockDispatcher(object):
+    interface.implements(IDispatcher)
+
+    def __init__(self):
+        self.queue = []
+
+    def index(self, obj, attributes=None):
+        self.queue.append((INDEX, obj, attributes))
+
+    def reindex(self, obj, attributes=None):
+        self.queue.append((REINDEX, obj, attributes))
+
+    def unindex(self, obj):
+        self.queue.append((UNINDEX, obj, None))
+
+    def flush(self):
+        self.queue.append('flush')
+        
+class MockTransactionalDispatcher(MockDispatcher):
+    interface.implements(ITransactionalDispatcher)
+
+    processed = None
+    _hook = lambda self: 42
+
+    def index(self, obj, attributes=None):
+        super(MockTransactionalDispatcher, self).index(obj, attributes)
+        self._hook()
+
+    def reindex(self, obj, attributes=None):
+        super(MockTransactionalDispatcher, self).reindex(obj, attributes)
+        self._hook()
+
+    def unindex(self, obj):
+        super(MockTransactionalDispatcher, self).unindex(obj)
+        self._hook()
+
+    def getState(self):
+        return list(self.queue)
+
+    def setState(self, state):
+        self.queue = state
+
+    def optimize(self):
+        pass
+
+    def commit(self):
+        self.processed = self.queue
+        self.clear()
+        
+    def clear(self):
+        self.queue = []
+
+    def __len__(self):
+        return len(self.queue)

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py	2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,65 @@
+from zope import interface
+
+from transaction.interfaces import ISavepointDataManager
+from transaction import get as getTransaction
+
+from interfaces import ITransactionalDispatcher
+
+from threading import local
+
+import logging
+logger = logging.getLogger('z3c.indexing.dispatch.transactions')
+
+class QueueSavepoint:
+    """Transaction savepoints using the ITransactionalDispatcher interface."""
+
+    def __init__(self, queue):
+        self.queue = queue
+        self.state = queue.getState()
+
+    def rollback(self):
+        self.queue.setState(self.state)
+
+
+class QueueTM(local):
+    """Transaction manager hook for the transactional dispatcher."""
+    
+    interface.implements(ISavepointDataManager)
+
+    def __init__(self, queue):
+        local.__init__(self)
+        self.registered = False
+        self.vote = False
+        assert ITransactionalDispatcher.providedBy(queue), queue
+        self.queue = queue
+
+    def register(self):
+        if not self.registered:
+            getTransaction().join(self)
+            self.registered = True
+            
+    def savepoint(self):
+        return QueueSavepoint(self.queue)
+
+    def tpc_begin(self, transaction):
+        pass
+
+    def commit(self, transaction):
+        self.queue.commit()
+
+    def tpc_vote(self, transaction):
+        pass
+
+    def tpc_finish(self, transaction):
+        self.registered = False
+
+    def tpc_abort(self, transaction):
+        if len(self.queue):
+            logger.debug('emptying unprocessed queue due to abort()...')
+        self.queue.clear()
+        self.registered = False
+
+    abort = tpc_abort
+
+    def sortKey(self):
+        return id(self)



More information about the Checkins mailing list