[Checkins] SVN: z3c.indexing.dispatch/trunk/ Defer indexing to
after request has ended; added default component configuration file.
Malthe Borch
mborch at gmail.com
Sun Mar 30 20:08:48 EDT 2008
Log message for revision 85024:
Defer indexing to after request has ended; added default component configuration file.
Changed:
U z3c.indexing.dispatch/trunk/README.txt
U z3c.indexing.dispatch/trunk/setup.py
U z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt
U z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/configure.zcml
U z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py
U z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py
U z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py
U z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py
-=-
Modified: z3c.indexing.dispatch/trunk/README.txt
===================================================================
--- z3c.indexing.dispatch/trunk/README.txt 2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/README.txt 2008-03-31 00:08:46 UTC (rev 85024)
@@ -2,3 +2,12 @@
========
This package implements a transaction-safe indexing dispatcher.
+
+* Pluggable indexing architecture
+
+* Indexing operations are deferred to right after the request has
+ended, exhibiting asynchronous behavior. All operations are carried
+out in the thread that committed the tranaction.
+
+* The operation queue is optimized to avoid unnecessary indexing.
+
Modified: z3c.indexing.dispatch/trunk/setup.py
===================================================================
--- z3c.indexing.dispatch/trunk/setup.py 2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/setup.py 2008-03-31 00:08:46 UTC (rev 85024)
@@ -29,6 +29,7 @@
'zope.interface',
'zope.component',
'zope.testing',
+ 'zope.app.publication',
'transaction',
# -*- Extra requirements: -*-
],
Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt 2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt 2008-03-31 00:08:46 UTC (rev 85024)
@@ -1,13 +1,15 @@
z3c.indexing.dispatch
=====================
-The indexing dispatcher is the main entry point for indexing content.
+The indexing dispatcher is the main entry point for indexing content
+using the ``z3c.indexing`` architecture.
-A dispatcher must implement three basic operations (defined in the
-``IDispatcher`` interface): index, reindex and unindex.
+A dispatcher must implement the three basic indexing operations
+(defined in the ``IDispatcher`` interface), index, reindex and
+unindex, as well as a flush-method.
-Dispatching flow
-----------------
+Configuration
+-------------
Dispatchers can perform indexing operations directly or defer work to
other dispatchers using the following lookup pattern:
@@ -17,11 +19,15 @@
Example dispatching flows:
transactional dispatcher -> zcatalog
- transactional dispatcher -> async -> xapian
+ transactional dispatcher -> custom filter -> xapian
-Transactional dispatching
--------------------------
+Operation
+---------
-The transactional dispatcher will queue indexing operations while
-waiting for the transaction boundary; then pass the operations on to
-the next set of dispatchers.
+The transactional dispatcher will queue indexing operations and only
+actually carry them out after the request has ended*. A transaction
+manager makes sure the queue only contains operations corresponding to
+a succesful transaction.
+
+*) This is currently the only supported behavior; an option should be
+ available to carry out operations before the request ends.
Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py 2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py 2008-03-31 00:08:46 UTC (rev 85024)
@@ -1 +1 @@
-#
+from queue import TransactionalDispatcher
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/configure.zcml
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/configure.zcml (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/configure.zcml 2008-03-31 00:08:46 UTC (rev 85024)
@@ -0,0 +1,16 @@
+<configure
+ xmlns="http://namespaces.zope.org/zope">
+
+ <!-- register dispatcher as thread-local global utility -->
+
+ <utility factory=".queue.getDispatcher" />
+
+ <!-- configure queue reducer -->
+
+ <utility factory=".reducer.QueueReducer" />
+
+ <!-- set queue to commit after request ends -->
+
+ <subscriber handler=".queue.commit" />
+
+</configure>
Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py 2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py 2008-03-31 00:08:46 UTC (rev 85024)
@@ -1,5 +1,12 @@
from zope.interface import Interface
+class IIndexable(Interface):
+ """Marker-interface for indexable objects.
+
+ Typically this interface will be set on objects that *should* be
+ indexed, regardless of capability.
+ """
+
class IDispatcher(Interface):
"""Interface for dispatching indexing operations.
@@ -8,18 +15,17 @@
"""
def index(obj, attributes=None):
- """Queue an index operation for the given object and attributes."""
+ """Queue an index operation, optionally passing attributes."""
def reindex(obj, attributes=None):
- """Queue a reindex operation for the given object and attributes."""
+ """Queue a reindex operation, optionally passing attributes."""
def unindex(obj):
- """Queue an unindex operation for the given object."""
+ """Queue an unindex operation."""
- def flush(obj):
+ def flush():
"""Flush queue."""
-
class ITransactionalDispatcher(IDispatcher):
"""A transactional dispatcher will keep operations in a queue
until a transaction boundary."""
@@ -35,7 +41,6 @@
def setState(state):
"""Set queue state."""
-
class IQueueReducer(Interface):
"""Operation queue optimization.
@@ -51,7 +56,7 @@
The provided ``queue`` should be a sequence of operations on
the form:
- (operator, object, attributes)
+ (operator, obj, attributes)
An optimized sequence is returned.
"""
Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py 2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py 2008-03-31 00:08:46 UTC (rev 85024)
@@ -3,22 +3,25 @@
from threading import local
+from zope.app.publication.interfaces import IEndRequestEvent
+
+from ZODB.interfaces import IConnection
+
from z3c.indexing.dispatch.interfaces import IDispatcher
from z3c.indexing.dispatch.interfaces import ITransactionalDispatcher
from z3c.indexing.dispatch.interfaces import IQueueReducer
-
from z3c.indexing.dispatch.constants import INDEX, REINDEX, UNINDEX
from z3c.indexing.dispatch.transactions import QueueTM
-
from z3c.indexing.dispatch import operation
import transaction
from logging import getLogger
-debug = getLogger('z3c.indexing.dispatch.queue').debug
+debug = getLogger('z3c.indexing.dispatch.queue').info
localQueue = None
+ at interface.implementer(ITransactionalDispatcher)
def getDispatcher():
"""Return a (thread-local) dispatcher, creating one if necessary."""
@@ -27,6 +30,9 @@
localQueue = TransactionalDispatcher()
return localQueue
+ at component.adapter(IEndRequestEvent)
+def commit(ev):
+ getDispatcher().commit()
class TransactionalDispatcher(local):
"""An indexing queue."""
@@ -39,17 +45,14 @@
self.queue = []
def index(self, obj, attributes=None):
- debug('adding index operation for %r', obj)
self.queue.append(operation.Add(obj, attributes))
self._hook()
def reindex(self, obj, attributes=None):
- debug('adding reindex operation for %r', obj)
self.queue.append(operation.Modify(obj, attributes))
self._hook()
def unindex(self, obj):
- debug('adding unindex operation for %r', obj)
self.queue.append(operation.Delete(obj))
self._hook()
@@ -61,7 +64,12 @@
dispatchers = set()
- for op, obj, attributes in self.queue:
+ for op, obj, attributes in self.queue:
+ conn = IConnection(obj, None)
+
+ if conn is not None:
+ conn.open()
+
for name, dispatcher in component.getAdapters((self, obj), IDispatcher):
if op == INDEX:
dispatcher.index(obj, attributes)
@@ -78,7 +86,9 @@
for dispatcher in dispatchers:
dispatcher.flush()
-
+
+ transaction.commit()
+
def clear(self):
debug('clearing %d queue item(s)', len(self.queue))
del self.queue[:]
@@ -96,8 +106,8 @@
def _hook(self):
"""Register a hook into the transaction machinery.
- This is to make sure the queue's processing method gets called
- back just before the transaction is about to be committed.
+ The indexing operations in the queue should be carried out if
+ and only if the transaction to which they belong is committed.
"""
if self.tmhook is None:
Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py 2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py 2008-03-31 00:08:46 UTC (rev 85024)
@@ -15,8 +15,7 @@
def testFlushQueueOnCommit(self):
self.queue.index('foo')
commit()
- self.assertEqual(self.queue.getState(), [])
- self.assertEqual(self.queue.processed, [(INDEX, 'foo', None)])
+ self.assertEqual(self.queue.getState(), [(INDEX, 'foo', None)])
def testFlushQueueOnAbort(self):
self.queue.index('foo')
@@ -29,8 +28,7 @@
savepoint()
self.queue.reindex('bar')
commit()
- self.assertEqual(self.queue.getState(), [])
- self.assertEqual(self.queue.processed, [(INDEX, 'foo', None), (REINDEX, 'bar', None)])
+ self.assertEqual(self.queue.getState(), [(INDEX, 'foo', None), (REINDEX, 'bar', None)])
def testRollbackSavePoint(self):
self.queue.index('foo')
@@ -38,8 +36,7 @@
self.queue.reindex('bar')
sp.rollback()
commit()
- self.assertEqual(self.queue.getState(), [])
- self.assertEqual(self.queue.processed, [(INDEX, 'foo', None)])
+ self.assertEqual(self.queue.getState(), [(INDEX, 'foo', None)])
def test_suite():
Modified: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py 2008-03-30 23:51:02 UTC (rev 85023)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py 2008-03-31 00:08:46 UTC (rev 85024)
@@ -7,9 +7,6 @@
from threading import local
-import logging
-logger = logging.getLogger('z3c.indexing.dispatch.transactions')
-
class QueueSavepoint:
"""Transaction savepoints using the ITransactionalDispatcher interface."""
@@ -20,9 +17,8 @@
def rollback(self):
self.queue.setState(self.state)
-
class QueueTM(local):
- """Transaction manager hook for the transactional dispatcher."""
+ """Transaction manager for the transactional dispatcher."""
interface.implements(ISavepointDataManager)
@@ -45,7 +41,7 @@
pass
def commit(self, transaction):
- self.queue.commit()
+ pass
def tpc_vote(self, transaction):
pass
@@ -54,8 +50,6 @@
self.registered = False
def tpc_abort(self, transaction):
- if len(self.queue):
- logger.debug('emptying unprocessed queue due to abort()...')
self.queue.clear()
self.registered = False
More information about the Checkins
mailing list