[Checkins] SVN: z3c.indexing.dispatch/trunk/ Defer indexing to after request has ended; added default component configuration file.

Malthe Borch mborch at gmail.com
Sun Mar 30 20:08:48 EDT 2008


Log message for revision 85024:
  Defer indexing to after request has ended; added default component configuration file.

Changed:
  U   z3c.indexing.dispatch/trunk/README.txt
  U   z3c.indexing.dispatch/trunk/setup.py
  U   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt
  U   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py
  A   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/configure.zcml
  U   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py
  U   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py
  U   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py
  U   z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py

-=-
Modified: z3c.indexing.dispatch/trunk/README.txt
===================================================================
--- z3c.indexing.dispatch/trunk/README.txt	2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/README.txt	2008-03-31 00:08:46 UTC (rev 85024)
@@ -2,3 +2,12 @@
 ========
 
 This package implements a transaction-safe indexing dispatcher.
+
+* Pluggable indexing architecture
+
+* Indexing operations are deferred to right after the request has
+ended, exhibiting asynchronous behavior. All operations are carried
+out in the thread that committed the tranaction.
+
+* The operation queue is optimized to avoid unnecessary indexing.
+

Modified: z3c.indexing.dispatch/trunk/setup.py
===================================================================
--- z3c.indexing.dispatch/trunk/setup.py	2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/setup.py	2008-03-31 00:08:46 UTC (rev 85024)
@@ -29,6 +29,7 @@
           'zope.interface',
           'zope.component',
           'zope.testing',
+          'zope.app.publication',
           'transaction',
           # -*- Extra requirements: -*-
       ],

Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt	2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt	2008-03-31 00:08:46 UTC (rev 85024)
@@ -1,13 +1,15 @@
 z3c.indexing.dispatch
 =====================
 
-The indexing dispatcher is the main entry point for indexing content.
+The indexing dispatcher is the main entry point for indexing content
+using the ``z3c.indexing`` architecture.
 
-A dispatcher must implement three basic operations (defined in the
-``IDispatcher`` interface): index, reindex and unindex.
+A dispatcher must implement the three basic indexing operations
+(defined in the ``IDispatcher`` interface), index, reindex and
+unindex, as well as a flush-method.
 
-Dispatching flow
-----------------
+Configuration
+-------------
 
 Dispatchers can perform indexing operations directly or defer work to
 other dispatchers using the following lookup pattern:
@@ -17,11 +19,15 @@
 Example dispatching flows:
 
   transactional dispatcher -> zcatalog
-  transactional dispatcher -> async -> xapian
+  transactional dispatcher -> custom filter -> xapian
 
-Transactional dispatching
--------------------------
+Operation
+---------
 
-The transactional dispatcher will queue indexing operations while
-waiting for the transaction boundary; then pass the operations on to
-the next set of dispatchers.
+The transactional dispatcher will queue indexing operations and only
+actually carry them out after the request has ended*. A transaction
+manager makes sure the queue only contains operations corresponding to
+a succesful transaction.
+
+*) This is currently the only supported behavior; an option should be
+ available to carry out operations before the request ends.

Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py	2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py	2008-03-31 00:08:46 UTC (rev 85024)
@@ -1 +1 @@
-#
+from queue import TransactionalDispatcher

Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/configure.zcml
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/configure.zcml	                        (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/configure.zcml	2008-03-31 00:08:46 UTC (rev 85024)
@@ -0,0 +1,16 @@
+<configure
+    xmlns="http://namespaces.zope.org/zope">
+
+  <!-- register dispatcher as thread-local global utility -->
+  
+  <utility factory=".queue.getDispatcher" />
+
+  <!-- configure queue reducer -->
+  
+  <utility factory=".reducer.QueueReducer" />
+
+  <!-- set queue to commit after request ends -->
+  
+  <subscriber handler=".queue.commit" />
+  
+</configure>

Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py	2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py	2008-03-31 00:08:46 UTC (rev 85024)
@@ -1,5 +1,12 @@
 from zope.interface import Interface
 
+class IIndexable(Interface):
+    """Marker-interface for indexable objects.
+
+    Typically this interface will be set on objects that *should* be
+    indexed, regardless of capability.
+    """
+
 class IDispatcher(Interface):
     """Interface for dispatching indexing operations.
 
@@ -8,18 +15,17 @@
     """
 
     def index(obj, attributes=None):
-        """Queue an index operation for the given object and attributes."""
+        """Queue an index operation, optionally passing attributes."""
 
     def reindex(obj, attributes=None):
-        """Queue a reindex operation for the given object and attributes."""
+        """Queue a reindex operation, optionally passing attributes."""
 
     def unindex(obj):
-        """Queue an unindex operation for the given object."""
+        """Queue an unindex operation."""
 
-    def flush(obj):
+    def flush():
         """Flush queue."""
         
-
 class ITransactionalDispatcher(IDispatcher):
     """A transactional dispatcher will keep operations in a queue
     until a transaction boundary."""
@@ -35,7 +41,6 @@
 
     def setState(state):
         """Set queue state."""
-
     
 class IQueueReducer(Interface):
     """Operation queue optimization.
@@ -51,7 +56,7 @@
         The provided ``queue`` should be a sequence of operations on
         the form:
 
-           (operator, object, attributes)
+           (operator, obj, attributes)
 
         An optimized sequence is returned.
         """

Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py	2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py	2008-03-31 00:08:46 UTC (rev 85024)
@@ -3,22 +3,25 @@
 
 from threading import local
 
+from zope.app.publication.interfaces import IEndRequestEvent
+
+from ZODB.interfaces import IConnection
+
 from z3c.indexing.dispatch.interfaces import IDispatcher
 from z3c.indexing.dispatch.interfaces import ITransactionalDispatcher
 from z3c.indexing.dispatch.interfaces import IQueueReducer
-
 from z3c.indexing.dispatch.constants import INDEX, REINDEX, UNINDEX
 from z3c.indexing.dispatch.transactions import QueueTM
-
 from z3c.indexing.dispatch import operation
 
 import transaction
 
 from logging import getLogger
-debug = getLogger('z3c.indexing.dispatch.queue').debug
+debug = getLogger('z3c.indexing.dispatch.queue').info
 
 localQueue = None
 
+ at interface.implementer(ITransactionalDispatcher)
 def getDispatcher():
     """Return a (thread-local) dispatcher, creating one if necessary."""
     
@@ -27,6 +30,9 @@
         localQueue = TransactionalDispatcher()
     return localQueue
 
+ at component.adapter(IEndRequestEvent)
+def commit(ev):
+    getDispatcher().commit()
 
 class TransactionalDispatcher(local):
     """An indexing queue."""
@@ -39,17 +45,14 @@
         self.queue = []
     
     def index(self, obj, attributes=None):
-        debug('adding index operation for %r', obj)
         self.queue.append(operation.Add(obj, attributes))
         self._hook()
 
     def reindex(self, obj, attributes=None):
-        debug('adding reindex operation for %r', obj)
         self.queue.append(operation.Modify(obj, attributes))
         self._hook()
 
     def unindex(self, obj):
-        debug('adding unindex operation for %r', obj)
         self.queue.append(operation.Delete(obj))
         self._hook()
 
@@ -61,7 +64,12 @@
 
         dispatchers = set()
 
-        for op, obj, attributes in self.queue:            
+        for op, obj, attributes in self.queue:
+            conn = IConnection(obj, None)
+                
+            if conn is not None:
+                conn.open()
+                
             for name, dispatcher in component.getAdapters((self, obj), IDispatcher):
                 if op == INDEX:
                     dispatcher.index(obj, attributes)
@@ -78,7 +86,9 @@
 
         for dispatcher in dispatchers:
             dispatcher.flush()
-            
+
+        transaction.commit()
+        
     def clear(self):
         debug('clearing %d queue item(s)', len(self.queue))
         del self.queue[:]
@@ -96,8 +106,8 @@
     def _hook(self):
         """Register a hook into the transaction machinery.
 
-        This is to make sure the queue's processing method gets called
-        back just before the transaction is about to be committed.
+        The indexing operations in the queue should be carried out if
+        and only if the transaction to which they belong is committed.
         """
 
         if self.tmhook is None:

Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py	2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py	2008-03-31 00:08:46 UTC (rev 85024)
@@ -15,8 +15,7 @@
     def testFlushQueueOnCommit(self):
         self.queue.index('foo')
         commit()
-        self.assertEqual(self.queue.getState(), [])
-        self.assertEqual(self.queue.processed, [(INDEX, 'foo', None)])
+        self.assertEqual(self.queue.getState(), [(INDEX, 'foo', None)])
 
     def testFlushQueueOnAbort(self):
         self.queue.index('foo')
@@ -29,8 +28,7 @@
         savepoint()
         self.queue.reindex('bar')
         commit()
-        self.assertEqual(self.queue.getState(), [])
-        self.assertEqual(self.queue.processed, [(INDEX, 'foo', None), (REINDEX, 'bar', None)])
+        self.assertEqual(self.queue.getState(), [(INDEX, 'foo', None), (REINDEX, 'bar', None)])
 
     def testRollbackSavePoint(self):
         self.queue.index('foo')
@@ -38,8 +36,7 @@
         self.queue.reindex('bar')
         sp.rollback()
         commit()
-        self.assertEqual(self.queue.getState(), [])
-        self.assertEqual(self.queue.processed, [(INDEX, 'foo', None)])
+        self.assertEqual(self.queue.getState(), [(INDEX, 'foo', None)])
 
 
 def test_suite():

Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py	2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py	2008-03-31 00:08:46 UTC (rev 85024)
@@ -7,9 +7,6 @@
 
 from threading import local
 
-import logging
-logger = logging.getLogger('z3c.indexing.dispatch.transactions')
-
 class QueueSavepoint:
     """Transaction savepoints using the ITransactionalDispatcher interface."""
 
@@ -20,9 +17,8 @@
     def rollback(self):
         self.queue.setState(self.state)
 
-
 class QueueTM(local):
-    """Transaction manager hook for the transactional dispatcher."""
+    """Transaction manager for the transactional dispatcher."""
     
     interface.implements(ISavepointDataManager)
 
@@ -45,7 +41,7 @@
         pass
 
     def commit(self, transaction):
-        self.queue.commit()
+        pass
 
     def tpc_vote(self, transaction):
         pass
@@ -54,8 +50,6 @@
         self.registered = False
 
     def tpc_abort(self, transaction):
-        if len(self.queue):
-            logger.debug('emptying unprocessed queue due to abort()...')
         self.queue.clear()
         self.registered = False
 



More information about the Checkins mailing list