[Checkins] SVN: z3c.indexing.async/ Initial import.

Malthe Borch mborch at gmail.com
Sat Mar 29 07:20:50 EDT 2008


Log message for revision 85010:
  Initial import.

Changed:
  A   z3c.indexing.async/
  A   z3c.indexing.async/trunk/
  A   z3c.indexing.async/trunk/AUTHOR.txt
  A   z3c.indexing.async/trunk/README.txt
  A   z3c.indexing.async/trunk/bootstrap.py
  A   z3c.indexing.async/trunk/buildout.cfg
  A   z3c.indexing.async/trunk/setup.py
  A   z3c.indexing.async/trunk/src/
  A   z3c.indexing.async/trunk/src/z3c/
  A   z3c.indexing.async/trunk/src/z3c/__init__.py
  A   z3c.indexing.async/trunk/src/z3c/indexing/
  A   z3c.indexing.async/trunk/src/z3c/indexing/__init__.py
  A   z3c.indexing.async/trunk/src/z3c/indexing/async/
  A   z3c.indexing.async/trunk/src/z3c/indexing/async/README.txt
  A   z3c.indexing.async/trunk/src/z3c/indexing/async/__init__.py
  A   z3c.indexing.async/trunk/src/z3c/indexing/async/dispatcher.py
  A   z3c.indexing.async/trunk/src/z3c/indexing/async/queue.py
  A   z3c.indexing.async/trunk/src/z3c/indexing/async/tests.py

-=-
Added: z3c.indexing.async/trunk/AUTHOR.txt
===================================================================
--- z3c.indexing.async/trunk/AUTHOR.txt	                        (rev 0)
+++ z3c.indexing.async/trunk/AUTHOR.txt	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,4 @@
+Authors
+=======
+
+Malthe Borch, Sylvian Viollon, Kapil Thangavelu

Added: z3c.indexing.async/trunk/README.txt
===================================================================
--- z3c.indexing.async/trunk/README.txt	                        (rev 0)
+++ z3c.indexing.async/trunk/README.txt	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,4 @@
+Overview
+--------
+
+Provides an asynchronous indexing dispatcher.

Added: z3c.indexing.async/trunk/bootstrap.py
===================================================================
--- z3c.indexing.async/trunk/bootstrap.py	                        (rev 0)
+++ z3c.indexing.async/trunk/bootstrap.py	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,52 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Bootstrap a buildout-based project
+
+Simply run this script in a directory containing a buildout.cfg.
+The script accepts buildout command-line options, so you can
+use the -c option to specify an alternate configuration file.
+
+$Id: bootstrap.py 71627 2006-12-20 16:46:11Z jim $
+"""
+
+import os, shutil, sys, tempfile, urllib2
+
+tmpeggs = tempfile.mkdtemp()
+
+ez = {}
+exec urllib2.urlopen('http://peak.telecommunity.com/dist/ez_setup.py'
+                     ).read() in ez
+ez['use_setuptools'](to_dir=tmpeggs, download_delay=0)
+
+import pkg_resources
+
+cmd = 'from setuptools.command.easy_install import main; main()'
+if sys.platform == 'win32':
+    cmd = '"%s"' % cmd # work around spawn lamosity on windows
+
+ws = pkg_resources.working_set
+assert os.spawnle(
+    os.P_WAIT, sys.executable, sys.executable,
+    '-c', cmd, '-mqNxd', tmpeggs, 'zc.buildout',
+    dict(os.environ,
+         PYTHONPATH=
+         ws.find(pkg_resources.Requirement.parse('setuptools')).location
+         ),
+    ) == 0
+
+ws.add_entry(tmpeggs)
+ws.require('zc.buildout')
+import zc.buildout.buildout
+zc.buildout.buildout.main(sys.argv[1:] + ['bootstrap'])
+shutil.rmtree(tmpeggs)

Added: z3c.indexing.async/trunk/buildout.cfg
===================================================================
--- z3c.indexing.async/trunk/buildout.cfg	                        (rev 0)
+++ z3c.indexing.async/trunk/buildout.cfg	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,9 @@
+[buildout]
+develop = . ../z3c.indexing.dispatch
+parts = test
+
+find-links = http://download.zope.org/distribution/
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = z3c.indexing.async
\ No newline at end of file

Added: z3c.indexing.async/trunk/setup.py
===================================================================
--- z3c.indexing.async/trunk/setup.py	                        (rev 0)
+++ z3c.indexing.async/trunk/setup.py	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,38 @@
+from setuptools import setup, find_packages
+import sys, os
+
+version = '0.1'
+
+setup(name='z3c.indexing.async',
+      version=version,
+      description="Asynchronous operation dispatching support.",
+      long_description=open('README.txt').read(),
+      classifiers=[
+        "Framework :: Plone",
+        "Framework :: Zope2",
+        "Framework :: Zope3",
+        "Programming Language :: Python",
+        "Topic :: Software Development :: Libraries :: Python Modules",
+        ],
+      keywords='',
+      author='Zope Corporation and Contributors',
+      author_email='zope3-dev at zope.org',
+      url='',
+      license='ZPL',
+      packages=find_packages('src'),
+      package_dir={'': 'src'},
+      namespace_packages=['z3c', 'z3c.indexing'],
+      include_package_data=True,
+      zip_safe=False,
+      install_requires=[
+          'setuptools',
+          'zope.interface',
+          'zope.component',
+          'zope.testing',
+          'z3c.indexing.dispatch',
+          # -*- Extra requirements: -*-
+      ],
+      entry_points="""
+      # -*- Entry points: -*-
+      """,
+      )

Added: z3c.indexing.async/trunk/src/z3c/__init__.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/__init__.py	                        (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/__init__.py	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,6 @@
+# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
+try:
+    __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+    from pkgutil import extend_path
+    __path__ = extend_path(__path__, __name__)

Added: z3c.indexing.async/trunk/src/z3c/indexing/__init__.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/__init__.py	                        (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/__init__.py	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,6 @@
+# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
+try:
+    __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+    from pkgutil import extend_path
+    __path__ = extend_path(__path__, __name__)

Added: z3c.indexing.async/trunk/src/z3c/indexing/async/README.txt
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/async/README.txt	                        (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/async/README.txt	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,67 @@
+z3c.indexing.async
+==================
+
+The asynchronous dispatcher passes operations on to a worker thread
+which is initialized with a connection object.
+
+Let's start the queue.
+
+  >>> from z3c.indexing.async import queue
+  >>> queue.QueueProcessor.FLUSH_TIMEOUT = 0.5
+  >>> queue.QueueProcessor.start()
+  <z3c.indexing.async.queue.QueueProcessor object at ...>
+
+The asynchronous dispatcher leaves operations up to other
+dispatchers. We'll provide a mock implementation and register it as a
+component.
+
+  >>> class MockDispatcher(object):
+  ...     def __init__(self):
+  ...          self.queue = []
+  ...
+  ...     def index(self, obj, attributes=None):
+  ...         self.queue.append((obj, attributes))
+  ...
+  ...     def flush(self):
+  ...         print "Flushing queue: %s" % str(self.queue)
+  ...         del self.queue[:]
+  
+We'll provide this dispatcher for string items.
+  
+  >>> from z3c.indexing.dispatch.interfaces import IDispatcher
+  >>> _dispatcher = MockDispatcher()
+  >>> component.provideAdapter(
+  ...     lambda *args: _dispatcher, (IDispatcher, str), IDispatcher)
+
+  >>> from z3c.indexing.async.dispatcher import AsynchronousDispatcher
+  >>> dispatcher = AsynchronousDispatcher()
+
+Index some strings:
+  
+  >>> dispatcher.index('rabbit')
+  >>> dispatcher.index('elephant')
+    
+Wait for the timeout (set to 0.5 seconds in this test)...
+
+  >>> import time
+  >>> time.sleep(0.6)
+  Flushing queue: [('rabbit', None), ('elephant', None)]
+
+Let's try and index another item and flush the queue manually:
+  
+  >>> dispatcher.index('snake')
+  >>> dispatcher.flush()
+
+Since the queue is running in its own thread, we'll want to sleep for
+just a short while.
+
+  >>> time.sleep(0.1)
+  Flushing queue: [('snake', None)]  
+  
+Cleanup
+-------
+
+To be a good testing citizen, we cleanup our queue processing thread.
+  
+  >>> queue.QueueProcessor.stop()
+

Added: z3c.indexing.async/trunk/src/z3c/indexing/async/__init__.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/async/__init__.py	                        (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/async/__init__.py	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1 @@
+#

Added: z3c.indexing.async/trunk/src/z3c/indexing/async/dispatcher.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/async/dispatcher.py	                        (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/async/dispatcher.py	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,39 @@
+from zope import interface
+from zope import component
+
+from z3c.indexing.dispatch.interfaces import IDispatcher
+from z3c.indexing.dispatch import operation
+
+from queue import index_queue as queue
+
+class AsynchronousProcess(object):
+    def __init__(self, operation, dispatcher):
+        self.operation = operation
+        self.dispatcher = dispatcher
+
+    def dispatch(self):
+        self.operation.process(self.dispatcher)
+
+class AsynchronousDispatcher(object):
+    """Asynchronous indexing dispatcher."""
+
+    interface.implements(IDispatcher)
+
+    def index(self, obj, attributes=None):
+        self._enqueue(operation.Add(obj, attributes))
+        
+    def reindex(self, obj, attributes=None):
+        self._enqueue(operation.Modify(obj, attributes))
+
+    def unindex(self, obj):
+        self._enqueue(operation.Delete(obj, attributes))
+
+    def flush(self):
+        queue.put(None)
+
+    def _enqueue(self, op):
+        obj = op.obj
+        
+        for name, dispatcher in component.getAdapters((self, obj), IDispatcher):
+            process = AsynchronousProcess(op, dispatcher)
+            queue.put(process)

Added: z3c.indexing.async/trunk/src/z3c/indexing/async/queue.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/async/queue.py	                        (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/async/queue.py	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,81 @@
+import Queue, threading
+
+# we do async indexing with all indexing operations put into this queue
+index_queue = Queue.Queue()
+
+# async queue processor
+class QueueProcessor( object ):
+
+    # Flush every _n_ changes to the db
+    FLUSH_THRESHOLD = 20
+
+    # Flush every _n_ seconds since the last change
+    FLUSH_TIMEOUT = 60
+
+    indexer_running = False
+    indexer_thread = None
+
+    def __iter__(self):
+        # iterator never ends, just sleeps when no results to process
+        while self.indexer_running:
+            # get an operation in blocking fashion
+            try:
+                op = index_queue.get(True, self.FLUSH_TIMEOUT)
+            except Queue.Empty:
+                yield None
+            else:
+                yield op
+
+    def __call__(self):
+        # number of documents indexed since last flush
+        op_delta = 0
+        
+        dispatchers = set()
+
+        def flush():
+            for dispatcher in dispatchers:
+                dispatcher.flush()
+
+            dispatchers.clear()
+            
+        # loop through queue iteration
+        for process in self:
+
+            # on timeout the op is none
+            if process is None:
+                # if we indexed anything since the last flush, flush it now
+                if op_delta:
+                    flush()
+                    op_delta = 0
+                continue
+
+            # process the operation
+            process.dispatch()
+
+            # keep track of dispatcher
+            dispatchers.add(process.dispatcher)
+            
+            op_delta += 1
+            
+            if op_delta % self.FLUSH_THRESHOLD == 0:
+                flush()
+                op_delta = 0
+
+    @classmethod
+    def start(klass):
+        if klass.indexer_running:
+            raise SyntaxError("Indexer already running")
+        
+        klass.indexer_running = True
+        indexer = klass()
+        klass.indexer_thread = threading.Thread(target=indexer)
+        klass.indexer_thread.setDaemon(True)
+        klass.indexer_thread.start()
+        return indexer
+
+    @classmethod
+    def stop(klass):
+        if not klass.indexer_running:
+            return
+        klass.indexer_running = False
+        klass.indexer_thread.join()

Added: z3c.indexing.async/trunk/src/z3c/indexing/async/tests.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/async/tests.py	                        (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/async/tests.py	2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,17 @@
+from zope import interface
+from zope import component
+
+import unittest
+from zope.testing import doctest
+
+def test_suite():
+    globs = dict(interface=interface, component=component)
+    
+    return unittest.TestSuite((
+        doctest.DocFileSuite(
+        'README.txt',
+        globs=globs,
+        optionflags=doctest.NORMALIZE_WHITESPACE|doctest.ELLIPSIS,
+        ),    
+        ))
+



More information about the Checkins mailing list