[Checkins] SVN: zc.async/branches/dev/src/zc/async/ add subscriber tests; refactor subscribers a bit; add logs for dispatcher stop and start

Gary Poster gary at zope.com
Fri Apr 4 20:57:41 EDT 2008


Log message for revision 85109:
  add subscriber tests; refactor subscribers a bit; add logs for dispatcher stop and start

Changed:
  U   zc.async/branches/dev/src/zc/async/README_2.txt
  U   zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml
  U   zc.async/branches/dev/src/zc/async/dispatcher.py
  U   zc.async/branches/dev/src/zc/async/dispatcher.txt
  U   zc.async/branches/dev/src/zc/async/interfaces.py
  U   zc.async/branches/dev/src/zc/async/subscribers.py
  A   zc.async/branches/dev/src/zc/async/subscribers.txt
  U   zc.async/branches/dev/src/zc/async/tests.py

-=-
Modified: zc.async/branches/dev/src/zc/async/README_2.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README_2.txt	2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/README_2.txt	2008-04-05 00:57:29 UTC (rev 85109)
@@ -542,7 +542,7 @@
         A Python time zone library
     
     - rwproperty
-        A small package of desriptor conveniences
+        A small package of descriptor conveniences
     
     - uuid
         The uuid module included in Python 2.5
@@ -556,9 +556,8 @@
     - zc.twist
         Conveniences for working with Twisted and the ZODB
     
-    - zc.twisted
-        A setuptools-friendly Twisted distribution, hopefully to be replaced
-        with a normal Twisted distribution when it is ready.
+    - twisted
+        The Twisted internet library.
     
     - ZConfig
         A general configuration package coming from the Zope project with which

Modified: zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml
===================================================================
--- zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml	2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml	2008-04-05 00:57:29 UTC (rev 85109)
@@ -2,7 +2,7 @@
 <configure xmlns="http://namespaces.zope.org/zope">
     <include file="dispatcher.zcml" />
     <subscriber handler=".subscribers.queue_installer" />
-    <subscriber handler=".subscribers.installThreadedDispatcher" />
+    <subscriber handler=".subscribers.threaded_dispatcher_installer" />
     <subscriber handler=".subscribers.agent_installer" />
     <adapter factory="zc.async.queue.getDefaultQueue" />
 </configure>

Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py	2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py	2008-04-05 00:57:29 UTC (rev 85109)
@@ -451,6 +451,8 @@
     def activate(self, threaded=False):
         if self.activated:
             raise ValueError('already activated')
+        zc.async.utils.log.info('attempting to activate dispatcher %s',
+                                self.UUID)
         self.activated = datetime.datetime.utcnow()
         # in case this is a restart, we clear old data
         self.polls.clear()
@@ -489,6 +491,8 @@
                 self.dead_pools.append(queue_pools.pop(name))
         conn_delta -= 1
         self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
+        zc.async.utils.log.info('deactivated dispatcher %s',
+                                self.UUID)
 
     # these methods are used for monitoring and analysis
 

Modified: zc.async/branches/dev/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.txt	2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/dispatcher.txt	2008-04-05 00:57:29 UTC (rev 85109)
@@ -364,7 +364,7 @@
 
     >>> import ZODB.FileStorage
     >>> storage = ZODB.FileStorage.FileStorage(
-    ...     'HistoricalConnectionTests.fs', create=True)
+    ...     'main.fs', create=True)
     >>> from ZODB.DB import DB 
     >>> db = DB(storage) 
     >>> conn = db.open()

Modified: zc.async/branches/dev/src/zc/async/interfaces.py
===================================================================
--- zc.async/branches/dev/src/zc/async/interfaces.py	2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/interfaces.py	2008-04-05 00:57:29 UTC (rev 85109)
@@ -5,6 +5,25 @@
 import zc.queue.interfaces
 from zc.async.i18n import _
 
+# this is our only direct dependency on anything in zope.app, which is
+# only used by our convenience subscribers.  Since we don't really need this,
+# or zope.app, we make this import optional and provide some replacements if
+# necessary.
+try:
+    from zope.app.appsetup.interfaces import (IDatabaseOpenedEvent,
+                                              DatabaseOpened)
+except ImportError:
+    class IDatabaseOpenedEvent(zope.interface.Interface):
+        """The main database has been opened."""
+    
+        database = zope.interface.Attribute("The main database.")
+    
+    class DatabaseOpened(object):
+        zope.interface.implements(IDatabaseOpenedEvent)
+    
+        def __init__(self, database):
+            self.database = database
+
 # TODO: these interfaces are not particularly complete.  The other
 # documentation is more accurate at the moment.
 

Modified: zc.async/branches/dev/src/zc/async/subscribers.py
===================================================================
--- zc.async/branches/dev/src/zc/async/subscribers.py	2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/subscribers.py	2008-04-05 00:57:29 UTC (rev 85109)
@@ -1,10 +1,8 @@
 import threading
-import time
 import signal
 import transaction
 import twisted.internet.selectreactor
 import zope.component
-import zope.app.appsetup.interfaces
 import zc.twist
 
 import zc.async.interfaces
@@ -18,8 +16,9 @@
     def __init__(self, queues=('',),
                  factory=lambda *args: zc.async.queue.Queue(),
                  db_name=None):
-        zope.component.adapter(
-            zope.app.appsetup.interfaces.IDatabaseOpenedEvent)(self)
+        # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
+        # package is around
+        zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
         self.db_name = db_name
         self.factory = factory
         self.queues = queues
@@ -57,34 +56,57 @@
             conn.close()
 
 queue_installer = QueueInstaller()
+multidb_queue_installer = QueueInstaller(db_name='async')
 
- at zope.component.adapter(zope.app.appsetup.interfaces.IDatabaseOpenedEvent)
-def installThreadedDispatcher(ev):
-    reactor = twisted.internet.selectreactor.SelectReactor()
-    # reactor._handleSignals()
-    curr_sigint_handler = signal.getsignal(signal.SIGINT)
-    def sigint_handler(*args):
-        reactor.callFromThread(reactor.stop)
-        time.sleep(0.5) # bah, a guess, but Works For Me (So Far)
-        curr_sigint_handler(*args)
-    def handler(*args):
-        reactor.callFromThread(reactor.stop)
-    signal.signal(signal.SIGINT, sigint_handler)
-    signal.signal(signal.SIGTERM, handler)
-    # Catch Ctrl-Break in windows
-    if getattr(signal, "SIGBREAK", None) is not None:
-        signal.signal(signal.SIGBREAK, handler)
-    dispatcher = zc.async.dispatcher.Dispatcher(ev.database, reactor)
-    def start():
-        dispatcher.activate()
-        reactor.run(installSignalHandlers=0)
-    thread = threading.Thread(target=start)
-    thread.setDaemon(True)
-    thread.start()
+class ThreadedDispatcherInstaller(object):
+    def __init__(self,
+                 poll_interval=5,
+                 reactor_factory=twisted.internet.selectreactor.SelectReactor):
+        self.poll_interval = poll_interval
+        self.reactor_factory = reactor_factory
+        # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
+        # package is around
+        zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
 
+    def __call__(self, ev):
+        reactor = self.reactor_factory()
+        dispatcher = zc.async.dispatcher.Dispatcher(
+            ev.database, reactor, poll_interval=self.poll_interval)
+        def start():
+            dispatcher.activate()
+            reactor.run(installSignalHandlers=0)
+        thread = threading.Thread(target=start)
+        thread.setDaemon(True)
+        thread.start()
+    
+        # The above is really sufficient. This signal registration, below, is
+        # an optimization. The dispatcher, on its next run, will eventually
+        # figure out that it is looking at a previous incarnation of itself if
+        # these handlers don't get to clean up.
+        # We do this with signal handlers rather than atexit.register because
+        # we want to clean up before the database is closed, if possible. ZODB
+        # does not provide an appropriate hook itself as of this writing.
+        curr_sigint_handler = signal.getsignal(signal.SIGINT)
+        def sigint_handler(*args):
+            reactor.callFromThread(reactor.stop)
+            thread.join(3)
+            curr_sigint_handler(*args)
+    
+        def handler(*args):
+            reactor.callFromThread(reactor.stop)
+            raise SystemExit()
+    
+        signal.signal(signal.SIGINT, sigint_handler)
+        signal.signal(signal.SIGTERM, handler)
+        # Catch Ctrl-Break in windows
+        if getattr(signal, "SIGBREAK", None) is not None:
+            signal.signal(signal.SIGBREAK, handler)
+
+threaded_dispatcher_installer = ThreadedDispatcherInstaller()
+
 class AgentInstaller(object):
 
-    def __init__(self, agent_name='', chooser=None, size=3, queue_names=None):
+    def __init__(self, agent_name, chooser=None, size=3, queue_names=None):
         zope.component.adapter(
             zc.async.interfaces.IDispatcherActivated)(self)
         self.queue_names = queue_names

Added: zc.async/branches/dev/src/zc/async/subscribers.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/subscribers.txt	                        (rev 0)
+++ zc.async/branches/dev/src/zc/async/subscribers.txt	2008-04-05 00:57:29 UTC (rev 85109)
@@ -0,0 +1,240 @@
+The subscribers module provides several conveniences for starting and
+configuring zc.async.  Let's assume we have a database and all of the
+necessary adapters and utilities registered[#setUp]_.
+
+The first helper we'll discuss is ``threaded_dispatcher_installer``.  This can be
+used as a subscriber to a DatabaseOpened event, as defined by zope.app.appsetup
+if you are using it, and defined by zc.async.interfaces if you are not. It is
+an instance of ``ThreadedDispatcherInstaller``, which, as the name implies, is
+a class to create handlers that install a threaded dispatcher.
+
+We will install a dispatcher that polls a bit faster than the default five
+seconds, so that we can have an easier time in running this doctest.
+
+    >>> import zc.async.subscribers
+    >>> import zc.async.interfaces
+    >>> import zope.event
+    >>> import zope.component
+    >>> isinstance(zc.async.subscribers.threaded_dispatcher_installer,
+    ...            zc.async.subscribers.ThreadedDispatcherInstaller)
+    True
+    >>> zc.async.subscribers.threaded_dispatcher_installer.poll_interval
+    5
+    >>> threaded_installer = zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...     poll_interval=0.5)
+    >>> zope.component.provideHandler(threaded_installer)
+    >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
+
+Now a dispatcher is installed and running.  (The get_poll helper is defined in
+the first footnote.)
+
+    >>> import zc.async.dispatcher
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> dispatcher.poll_interval
+    0.5
+    >>> get_poll(0)
+    {}
+
+The function also installs some signal handlers to optimize shutdown.  We'll
+look at them soon.  For now, let's install some queues.
+
+The subscribers module also includes helpers to install a queues collection
+and zero or more queues.  The QueueInstaller class lets you specify an
+iterable of names of queues to install, defaulting to ('',); a factory to
+generate queues, defaulting to something that generates a zc.async.queue.Queue;
+and a db_name if the queues collection should be placed in another database of
+the given name, for a multi-database setup, defaulting to None, indicating that
+the queues should be placed in the same database.
+
+Two instances of this class are already instantiated in the module; one with
+the defaults, and one specifying an additional database.
+
+    >>> isinstance(zc.async.subscribers.queue_installer,
+    ...            zc.async.subscribers.QueueInstaller)
+    True
+    >>> zc.async.subscribers.queue_installer.queues
+    ('',)
+    >>> print zc.async.subscribers.queue_installer.db_name
+    None
+    >>> isinstance(zc.async.subscribers.multidb_queue_installer,
+    ...            zc.async.subscribers.QueueInstaller)
+    True
+    >>> zc.async.subscribers.multidb_queue_installer.queues
+    ('',)
+    >>> zc.async.subscribers.multidb_queue_installer.db_name
+    'async'
+
+Let's try the multidb variation out.  We'll need another database, and the
+proper data structure set up on the two of them.  The first footnote of this
+file sets the necessary data structures up.
+
+The subscribers generated by this class expect to get the same event we fired
+above, an IDatabaseOpenedEvent. Normally only one of these events fires, since
+the database generally opens once, but for the purposes of our example we will
+fire it again in a moment.
+
+While we're at it, we'll use the other handler: ``AgentInstaller``.  This
+class generates a subscriber that installs agents in the queues it finds when
+dispatcher agent activation events fire.  You must specify an agent name to
+use; and can specify a chooser (a way to choose the tasks this agent should
+perform), a size (the number of concurrent jobs this agent should hand out),
+and specific queue names in which the agent should be installed, defaulting to
+None, or all queues.
+
+The agent_installer installs an agent named 'main' for the active dispatcher
+in all queues, with a default FIFO chooser.
+
+    >>> isinstance(zc.async.subscribers.agent_installer,
+    ...            zc.async.subscribers.AgentInstaller)
+    True
+    >>> zc.async.subscribers.agent_installer.agent_name
+    'main'
+    >>> print zc.async.subscribers.agent_installer.queue_names
+    None
+    >>> print zc.async.subscribers.agent_installer.chooser
+    None
+    >>> zc.async.subscribers.agent_installer.size
+    3
+
+Now we can install the subscribers and give it a try.  As we said above,
+normally the database opened event only fires once; this is just for purpose of
+demonstration.  We unregister the previous handler so nothing gets confused.
+
+    >>> zope.component.getGlobalSiteManager().unregisterHandler(
+    ...     threaded_installer)
+    True
+    >>> zope.component.provideHandler(
+    ...     zc.async.subscribers.multidb_queue_installer)
+    >>> zope.component.provideHandler(
+    ...     zc.async.subscribers.agent_installer)
+    >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
+
+Now if we look in the database, we'll find a queues collection in another
+database, with a queue, with a dispatcher, with an agent.
+
+    >>> import pprint
+    >>> pprint.pprint(get_poll())
+    {'': {'main': {'active jobs': [],
+                   'error': None,
+                   'len': 0,
+                   'new jobs': [],
+                   'size': 3}}}
+    >>> conn = db.open()
+    >>> root = conn.root()
+    >>> root._p_jar is conn
+    True
+    >>> queues = root[zc.async.interfaces.KEY]
+    >>> root[zc.async.interfaces.KEY]._p_jar is conn
+    False
+    >>> queues.keys()
+    ['']
+    >>> queue = queues['']
+    >>> len(queue.dispatchers)
+    1
+    >>> da = queue.dispatchers.values()[0]
+    >>> list(da)
+    ['main']
+    >>> bool(da.activated)
+    True
+
+Finally, we mentioned at the start that the threaded dispatcher installer also
+installed some signal handlers.  Let's show a SIGINT (CTRL-C, usually), and
+how it deactivates the dispatcher's agents collection in the ZODB.
+
+    >>> import signal
+    >>> import os
+    >>> if getattr(os, 'getpid', None) is not None: # UNIXEN, not Windows
+    ...     pid = os.getpid()
+    ...     try:
+    ...         os.kill(pid, signal.SIGINT)
+    ...     except KeyboardInterrupt:
+    ...         if dispatcher.activated:
+    ...             assert False, 'dispatcher did not deactivate'
+    ...     else:
+    ...         print "failed to send SIGINT, or something"
+    ... else:
+    ...     dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+    ...     for i in range(30):
+    ...         if not dispatcher.activated:
+    ...             break
+    ...         time.sleep(0.1)
+    ...     else:
+    ...         assert False, 'dispatcher did not deactivate'
+    ...
+    >>> import transaction
+    >>> t = transaction.begin() # sync
+    >>> bool(da.activated)
+    False
+
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
+
+.. [#setUp] Below we set up a database, provide the adapters and utilities
+    that the code expects, and then define some helper functions we'll use in
+    the examples.  See README_2 for a discussion of what is going on with the
+    configuration.
+
+    >>> databases = {}
+    >>> import ZODB.FileStorage
+    >>> storage = ZODB.FileStorage.FileStorage(
+    ...     'main.fs', create=True)
+    
+    >>> async_storage = ZODB.FileStorage.FileStorage(
+    ...     'async.fs', create=True)
+
+    >>> from ZODB.DB import DB 
+    >>> databases[''] = db = DB(storage)
+    >>> databases['async'] = async_db = DB(async_storage)
+    >>> async_db.databases = db.databases = databases
+    >>> db.database_name = ''
+    >>> async_db.database_name = 'async'
+
+    >>> from zc.twist import transactionManager, connection
+    >>> import zope.component
+    >>> zope.component.provideAdapter(transactionManager)
+    >>> zope.component.provideAdapter(connection)
+    >>> import ZODB.interfaces
+    >>> zope.component.provideAdapter(
+    ...     transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+    >>> import zope.component
+    >>> import types
+    >>> import zc.async.interfaces
+    >>> import zc.async.job
+    >>> zope.component.provideAdapter(
+    ...     zc.async.job.Job,
+    ...     adapts=(types.FunctionType,),
+    ...     provides=zc.async.interfaces.IJob)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.job.Job,
+    ...     adapts=(types.MethodType,),
+    ...     provides=zc.async.interfaces.IJob)
+    ...
+
+    >>> from zc.async.instanceuuid import UUID
+    >>> zope.component.provideUtility(
+    ...     UUID, zc.async.interfaces.IUUID, '')
+
+    >>> import time
+    >>> def get_poll(count = None):
+    ...     if count is None:
+    ...         count = len(dispatcher.polls)
+    ...     for i in range(30):
+    ...         if len(dispatcher.polls) > count:
+    ...             return dispatcher.polls.first()
+    ...         time.sleep(0.1)
+    ...     else:
+    ...         assert False, 'no poll!'
+    ... 
+
+    >>> import zc.async.interfaces
+    >>> def wait_for_result(job):
+    ...     for i in range(30):
+    ...         t = transaction.begin()
+    ...         if job.status == zc.async.interfaces.COMPLETED:
+    ...             return job.result
+    ...         time.sleep(0.1)
+    ...     else:
+    ...         assert False, 'job never completed'
+    ...
\ No newline at end of file


Property changes on: zc.async/branches/dev/src/zc/async/subscribers.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: zc.async/branches/dev/src/zc/async/tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/tests.py	2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/tests.py	2008-04-05 00:57:29 UTC (rev 85109)
@@ -33,6 +33,8 @@
     zc.async.testing.tearDownDatetime()
     module.tearDown(test)
     zope.component.testing.tearDown(test)
+    import signal
+    signal.signal(signal.SIGINT, signal.default_int_handler)
     if 'storage' in test.globs:
         test.globs['db'].close()
         test.globs['storage'].close()
@@ -116,6 +118,7 @@
             'queue.txt',
             'agent.txt',
             'dispatcher.txt',
+            'subscribers.txt',
             'README.txt',
             'README_2.txt',
             setUp=modSetUp, tearDown=modTearDown,



More information about the Checkins mailing list