[Checkins] SVN: zc.async/branches/dev/src/zc/async/ add subscriber
tests; refactor subscribers a bit;
add logs for dispatcher stop and start
Gary Poster
gary at zope.com
Fri Apr 4 20:57:41 EDT 2008
Log message for revision 85109:
add subscriber tests; refactor subscribers a bit; add logs for dispatcher stop and start
Changed:
U zc.async/branches/dev/src/zc/async/README_2.txt
U zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml
U zc.async/branches/dev/src/zc/async/dispatcher.py
U zc.async/branches/dev/src/zc/async/dispatcher.txt
U zc.async/branches/dev/src/zc/async/interfaces.py
U zc.async/branches/dev/src/zc/async/subscribers.py
A zc.async/branches/dev/src/zc/async/subscribers.txt
U zc.async/branches/dev/src/zc/async/tests.py
-=-
Modified: zc.async/branches/dev/src/zc/async/README_2.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README_2.txt 2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/README_2.txt 2008-04-05 00:57:29 UTC (rev 85109)
@@ -542,7 +542,7 @@
A Python time zone library
- rwproperty
- A small package of desriptor conveniences
+ A small package of descriptor conveniences
- uuid
The uuid module included in Python 2.5
@@ -556,9 +556,8 @@
- zc.twist
Conveniences for working with Twisted and the ZODB
- - zc.twisted
- A setuptools-friendly Twisted distribution, hopefully to be replaced
- with a normal Twisted distribution when it is ready.
+ - twisted
+ The Twisted internet library.
- ZConfig
A general configuration package coming from the Zope project with which
Modified: zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml
===================================================================
--- zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml 2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml 2008-04-05 00:57:29 UTC (rev 85109)
@@ -2,7 +2,7 @@
<configure xmlns="http://namespaces.zope.org/zope">
<include file="dispatcher.zcml" />
<subscriber handler=".subscribers.queue_installer" />
- <subscriber handler=".subscribers.installThreadedDispatcher" />
+ <subscriber handler=".subscribers.threaded_dispatcher_installer" />
<subscriber handler=".subscribers.agent_installer" />
<adapter factory="zc.async.queue.getDefaultQueue" />
</configure>
Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py 2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py 2008-04-05 00:57:29 UTC (rev 85109)
@@ -451,6 +451,8 @@
def activate(self, threaded=False):
if self.activated:
raise ValueError('already activated')
+ zc.async.utils.log.info('attempting to activate dispatcher %s',
+ self.UUID)
self.activated = datetime.datetime.utcnow()
# in case this is a restart, we clear old data
self.polls.clear()
@@ -489,6 +491,8 @@
self.dead_pools.append(queue_pools.pop(name))
conn_delta -= 1
self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
+ zc.async.utils.log.info('deactivated dispatcher %s',
+ self.UUID)
# these methods are used for monitoring and analysis
Modified: zc.async/branches/dev/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.txt 2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/dispatcher.txt 2008-04-05 00:57:29 UTC (rev 85109)
@@ -364,7 +364,7 @@
>>> import ZODB.FileStorage
>>> storage = ZODB.FileStorage.FileStorage(
- ... 'HistoricalConnectionTests.fs', create=True)
+ ... 'main.fs', create=True)
>>> from ZODB.DB import DB
>>> db = DB(storage)
>>> conn = db.open()
Modified: zc.async/branches/dev/src/zc/async/interfaces.py
===================================================================
--- zc.async/branches/dev/src/zc/async/interfaces.py 2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/interfaces.py 2008-04-05 00:57:29 UTC (rev 85109)
@@ -5,6 +5,25 @@
import zc.queue.interfaces
from zc.async.i18n import _
+# this is our only direct dependency on anything in zope.app, which is
+# only used by our convenience subscribers. Since we don't really need this,
+# or zope.app, we make this import optional and provide some replacements if
+# necessary.
+try:
+ from zope.app.appsetup.interfaces import (IDatabaseOpenedEvent,
+ DatabaseOpened)
+except ImportError:
+ class IDatabaseOpenedEvent(zope.interface.Interface):
+ """The main database has been opened."""
+
+ database = zope.interface.Attribute("The main database.")
+
+ class DatabaseOpened(object):
+ zope.interface.implements(IDatabaseOpenedEvent)
+
+ def __init__(self, database):
+ self.database = database
+
# TODO: these interfaces are not particularly complete. The other
# documentation is more accurate at the moment.
Modified: zc.async/branches/dev/src/zc/async/subscribers.py
===================================================================
--- zc.async/branches/dev/src/zc/async/subscribers.py 2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/subscribers.py 2008-04-05 00:57:29 UTC (rev 85109)
@@ -1,10 +1,8 @@
import threading
-import time
import signal
import transaction
import twisted.internet.selectreactor
import zope.component
-import zope.app.appsetup.interfaces
import zc.twist
import zc.async.interfaces
@@ -18,8 +16,9 @@
def __init__(self, queues=('',),
factory=lambda *args: zc.async.queue.Queue(),
db_name=None):
- zope.component.adapter(
- zope.app.appsetup.interfaces.IDatabaseOpenedEvent)(self)
+ # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
+ # package is around
+ zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
self.db_name = db_name
self.factory = factory
self.queues = queues
@@ -57,34 +56,57 @@
conn.close()
queue_installer = QueueInstaller()
+multidb_queue_installer = QueueInstaller(db_name='async')
- at zope.component.adapter(zope.app.appsetup.interfaces.IDatabaseOpenedEvent)
-def installThreadedDispatcher(ev):
- reactor = twisted.internet.selectreactor.SelectReactor()
- # reactor._handleSignals()
- curr_sigint_handler = signal.getsignal(signal.SIGINT)
- def sigint_handler(*args):
- reactor.callFromThread(reactor.stop)
- time.sleep(0.5) # bah, a guess, but Works For Me (So Far)
- curr_sigint_handler(*args)
- def handler(*args):
- reactor.callFromThread(reactor.stop)
- signal.signal(signal.SIGINT, sigint_handler)
- signal.signal(signal.SIGTERM, handler)
- # Catch Ctrl-Break in windows
- if getattr(signal, "SIGBREAK", None) is not None:
- signal.signal(signal.SIGBREAK, handler)
- dispatcher = zc.async.dispatcher.Dispatcher(ev.database, reactor)
- def start():
- dispatcher.activate()
- reactor.run(installSignalHandlers=0)
- thread = threading.Thread(target=start)
- thread.setDaemon(True)
- thread.start()
+class ThreadedDispatcherInstaller(object):
+ def __init__(self,
+ poll_interval=5,
+ reactor_factory=twisted.internet.selectreactor.SelectReactor):
+ self.poll_interval = poll_interval
+ self.reactor_factory = reactor_factory
+ # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
+ # package is around
+ zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
+ def __call__(self, ev):
+ reactor = self.reactor_factory()
+ dispatcher = zc.async.dispatcher.Dispatcher(
+ ev.database, reactor, poll_interval=self.poll_interval)
+ def start():
+ dispatcher.activate()
+ reactor.run(installSignalHandlers=0)
+ thread = threading.Thread(target=start)
+ thread.setDaemon(True)
+ thread.start()
+
+ # The above is really sufficient. This signal registration, below, is
+ # an optimization. The dispatcher, on its next run, will eventually
+ # figure out that it is looking at a previous incarnation of itself if
+ # these handlers don't get to clean up.
+ # We do this with signal handlers rather than atexit.register because
+ # we want to clean up before the database is closed, if possible. ZODB
+ # does not provide an appropriate hook itself as of this writing.
+ curr_sigint_handler = signal.getsignal(signal.SIGINT)
+ def sigint_handler(*args):
+ reactor.callFromThread(reactor.stop)
+ thread.join(3)
+ curr_sigint_handler(*args)
+
+ def handler(*args):
+ reactor.callFromThread(reactor.stop)
+ raise SystemExit()
+
+ signal.signal(signal.SIGINT, sigint_handler)
+ signal.signal(signal.SIGTERM, handler)
+ # Catch Ctrl-Break in windows
+ if getattr(signal, "SIGBREAK", None) is not None:
+ signal.signal(signal.SIGBREAK, handler)
+
+threaded_dispatcher_installer = ThreadedDispatcherInstaller()
+
class AgentInstaller(object):
- def __init__(self, agent_name='', chooser=None, size=3, queue_names=None):
+ def __init__(self, agent_name, chooser=None, size=3, queue_names=None):
zope.component.adapter(
zc.async.interfaces.IDispatcherActivated)(self)
self.queue_names = queue_names
Added: zc.async/branches/dev/src/zc/async/subscribers.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/subscribers.txt (rev 0)
+++ zc.async/branches/dev/src/zc/async/subscribers.txt 2008-04-05 00:57:29 UTC (rev 85109)
@@ -0,0 +1,240 @@
+The subscribers module provides several conveniences for starting and
+configuring zc.async. Let's assume we have a database and all of the
+necessary adapters and utilities registered[#setUp]_.
+
+The first helper we'll discuss is ``threaded_dispatcher_installer``. This can be
+used as a subscriber to a DatabaseOpened event, as defined by zope.app.appsetup
+if you are using it, and defined by zc.async.interfaces if you are not. It is
+an instance of ``ThreadedDispatcherInstaller``, which, as the name implies, is
+a class to create handlers that install a threaded dispatcher.
+
+We will install a dispatcher that polls a bit faster than the default five
+seconds, so that we can have an easier time in running this doctest.
+
+ >>> import zc.async.subscribers
+ >>> import zc.async.interfaces
+ >>> import zope.event
+ >>> import zope.component
+ >>> isinstance(zc.async.subscribers.threaded_dispatcher_installer,
+ ... zc.async.subscribers.ThreadedDispatcherInstaller)
+ True
+ >>> zc.async.subscribers.threaded_dispatcher_installer.poll_interval
+ 5
+ >>> threaded_installer = zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.5)
+ >>> zope.component.provideHandler(threaded_installer)
+ >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
+
+Now a dispatcher is installed and running. (The get_poll helper is defined in
+the first footnote.)
+
+ >>> import zc.async.dispatcher
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> dispatcher.poll_interval
+ 0.5
+ >>> get_poll(0)
+ {}
+
+The function also installs some signal handlers to optimize shutdown. We'll
+look at them soon. For now, let's install some queues.
+
+The subscribers module also includes helpers to install a queues collection
+and zero or more queues. The QueueInstaller class lets you specify an
+iterable of names of queues to install, defaulting to ('',); a factory to
+generate queues, defaulting to something that generates a zc.async.queue.Queue;
+and a db_name if the queues collection should be placed in another database of
+the given name, for a multi-database setup, defaulting to None, indicating that
+the queues should be placed in the same database.
+
+Two instances of this class are already instantiated in the module; one with
+the defaults, and one specifying an additional database.
+
+ >>> isinstance(zc.async.subscribers.queue_installer,
+ ... zc.async.subscribers.QueueInstaller)
+ True
+ >>> zc.async.subscribers.queue_installer.queues
+ ('',)
+ >>> print zc.async.subscribers.queue_installer.db_name
+ None
+ >>> isinstance(zc.async.subscribers.multidb_queue_installer,
+ ... zc.async.subscribers.QueueInstaller)
+ True
+ >>> zc.async.subscribers.multidb_queue_installer.queues
+ ('',)
+ >>> zc.async.subscribers.multidb_queue_installer.db_name
+ 'async'
+
+Let's try the multidb variation out. We'll need another database, and the
+proper data structure set up on the two of them. The first footnote of this
+file sets the necessary data structures up.
+
+The subscribers generated by this class expect to get the same event we fired
+above, an IDatabaseOpenedEvent. Normally only one of these events fires, since
+the database generally opens once, but for the purposes of our example we will
+fire it again in a moment.
+
+While we're at it, we'll use the other handler: ``AgentInstaller``. This
+class generates a subscriber that installs agents in the queues it finds when
+dispatcher agent activation events fire. You must specify an agent name to
+use; and can specify a chooser (a way to choose the tasks this agent should
+perform), a size (the number of concurrent jobs this agent should hand out),
+and specific queue names in which the agent should be installed, defaulting to
+None, or all queues.
+
+The agent_installer installs an agent named 'main' for the active dispatcher
+in all queues, with a default FIFO chooser.
+
+ >>> isinstance(zc.async.subscribers.agent_installer,
+ ... zc.async.subscribers.AgentInstaller)
+ True
+ >>> zc.async.subscribers.agent_installer.agent_name
+ 'main'
+ >>> print zc.async.subscribers.agent_installer.queue_names
+ None
+ >>> print zc.async.subscribers.agent_installer.chooser
+ None
+ >>> zc.async.subscribers.agent_installer.size
+ 3
+
+Now we can install the subscribers and give it a try. As we said above,
+normally the database opened event only fires once; this is just for purpose of
+demonstration. We unregister the previous handler so nothing gets confused.
+
+ >>> zope.component.getGlobalSiteManager().unregisterHandler(
+ ... threaded_installer)
+ True
+ >>> zope.component.provideHandler(
+ ... zc.async.subscribers.multidb_queue_installer)
+ >>> zope.component.provideHandler(
+ ... zc.async.subscribers.agent_installer)
+ >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
+
+Now if we look in the database, we'll find a queues collection in another
+database, with a queue, with a dispatcher, with an agent.
+
+ >>> import pprint
+ >>> pprint.pprint(get_poll())
+ {'': {'main': {'active jobs': [],
+ 'error': None,
+ 'len': 0,
+ 'new jobs': [],
+ 'size': 3}}}
+ >>> conn = db.open()
+ >>> root = conn.root()
+ >>> root._p_jar is conn
+ True
+ >>> queues = root[zc.async.interfaces.KEY]
+ >>> root[zc.async.interfaces.KEY]._p_jar is conn
+ False
+ >>> queues.keys()
+ ['']
+ >>> queue = queues['']
+ >>> len(queue.dispatchers)
+ 1
+ >>> da = queue.dispatchers.values()[0]
+ >>> list(da)
+ ['main']
+ >>> bool(da.activated)
+ True
+
+Finally, we mentioned at the start that the threaded dispatcher installer also
+installed some signal handlers. Let's show a SIGINT (CTRL-C, usually), and
+how it deactivates the dispatcher's agents collection in the ZODB.
+
+ >>> import signal
+ >>> import os
+ >>> if getattr(os, 'getpid', None) is not None: # UNIXEN, not Windows
+ ... pid = os.getpid()
+ ... try:
+ ... os.kill(pid, signal.SIGINT)
+ ... except KeyboardInterrupt:
+ ... if dispatcher.activated:
+ ... assert False, 'dispatcher did not deactivate'
+ ... else:
+ ... print "failed to send SIGINT, or something"
+ ... else:
+ ... dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+ ... for i in range(30):
+ ... if not dispatcher.activated:
+ ... break
+ ... time.sleep(0.1)
+ ... else:
+ ... assert False, 'dispatcher did not deactivate'
+ ...
+ >>> import transaction
+ >>> t = transaction.begin() # sync
+ >>> bool(da.activated)
+ False
+
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
+
+.. [#setUp] Below we set up a database, provide the adapters and utilities
+ that the code expects, and then define some helper functions we'll use in
+ the examples. See README_2 for a discussion of what is going on with the
+ configuration.
+
+ >>> databases = {}
+ >>> import ZODB.FileStorage
+ >>> storage = ZODB.FileStorage.FileStorage(
+ ... 'main.fs', create=True)
+
+ >>> async_storage = ZODB.FileStorage.FileStorage(
+ ... 'async.fs', create=True)
+
+ >>> from ZODB.DB import DB
+ >>> databases[''] = db = DB(storage)
+ >>> databases['async'] = async_db = DB(async_storage)
+ >>> async_db.databases = db.databases = databases
+ >>> db.database_name = ''
+ >>> async_db.database_name = 'async'
+
+ >>> from zc.twist import transactionManager, connection
+ >>> import zope.component
+ >>> zope.component.provideAdapter(transactionManager)
+ >>> zope.component.provideAdapter(connection)
+ >>> import ZODB.interfaces
+ >>> zope.component.provideAdapter(
+ ... transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+ >>> import zope.component
+ >>> import types
+ >>> import zc.async.interfaces
+ >>> import zc.async.job
+ >>> zope.component.provideAdapter(
+ ... zc.async.job.Job,
+ ... adapts=(types.FunctionType,),
+ ... provides=zc.async.interfaces.IJob)
+ >>> zope.component.provideAdapter(
+ ... zc.async.job.Job,
+ ... adapts=(types.MethodType,),
+ ... provides=zc.async.interfaces.IJob)
+ ...
+
+ >>> from zc.async.instanceuuid import UUID
+ >>> zope.component.provideUtility(
+ ... UUID, zc.async.interfaces.IUUID, '')
+
+ >>> import time
+ >>> def get_poll(count = None):
+ ... if count is None:
+ ... count = len(dispatcher.polls)
+ ... for i in range(30):
+ ... if len(dispatcher.polls) > count:
+ ... return dispatcher.polls.first()
+ ... time.sleep(0.1)
+ ... else:
+ ... assert False, 'no poll!'
+ ...
+
+ >>> import zc.async.interfaces
+ >>> def wait_for_result(job):
+ ... for i in range(30):
+ ... t = transaction.begin()
+ ... if job.status == zc.async.interfaces.COMPLETED:
+ ... return job.result
+ ... time.sleep(0.1)
+ ... else:
+ ... assert False, 'job never completed'
+ ...
\ No newline at end of file
Property changes on: zc.async/branches/dev/src/zc/async/subscribers.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: zc.async/branches/dev/src/zc/async/tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/tests.py 2008-04-04 21:32:25 UTC (rev 85108)
+++ zc.async/branches/dev/src/zc/async/tests.py 2008-04-05 00:57:29 UTC (rev 85109)
@@ -33,6 +33,8 @@
zc.async.testing.tearDownDatetime()
module.tearDown(test)
zope.component.testing.tearDown(test)
+ import signal
+ signal.signal(signal.SIGINT, signal.default_int_handler)
if 'storage' in test.globs:
test.globs['db'].close()
test.globs['storage'].close()
@@ -116,6 +118,7 @@
'queue.txt',
'agent.txt',
'dispatcher.txt',
+ 'subscribers.txt',
'README.txt',
'README_2.txt',
setUp=modSetUp, tearDown=modTearDown,
More information about the Checkins
mailing list