[Checkins] SVN: zc.async/trunk/s Multiple small changes.
Gary Poster
gary at zope.com
Wed Apr 23 16:26:33 EDT 2008
Log message for revision 85665:
Multiple small changes.
- Callbacks are logged at start in the trace log.
- All job results (including callbacks) are logged, including verbose
tracebacks if the callback generated a failure.
- Had the ThreadedDispatcherInstaller subscriber stash the thread on the
dispatcher, so you can shut down tests like this:
>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
>>> dispatcher.thread.join(3)
- Added ``getQueue`` to zc.async.local as a convenience (it does what you
could already do: ``zc.async.local.getJob().queue``).
- Clarified that ``IQueue.pull`` is the approved way of removing scheduled jobs
from a queue in interfaces and README.
- reports in the logs of a job's success or failure come before callbacks are
started.
- Added a section showing how the basic_dispatcher_policy.zcml worked, which
then pushed the former README_3 examples into README_3b.
Changed:
U zc.async/trunk/setup.py
U zc.async/trunk/src/zc/async/CHANGES.txt
U zc.async/trunk/src/zc/async/README.txt
U zc.async/trunk/src/zc/async/README_3.txt
A zc.async/trunk/src/zc/async/README_3b.txt
U zc.async/trunk/src/zc/async/TODO.txt
U zc.async/trunk/src/zc/async/dispatcher.py
U zc.async/trunk/src/zc/async/dispatcher.txt
U zc.async/trunk/src/zc/async/interfaces.py
U zc.async/trunk/src/zc/async/job.py
U zc.async/trunk/src/zc/async/job.txt
U zc.async/trunk/src/zc/async/queue.py
U zc.async/trunk/src/zc/async/subscribers.py
U zc.async/trunk/src/zc/async/subscribers.txt
U zc.async/trunk/src/zc/async/z3tests.py
-=-
Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/setup.py 2008-04-23 20:26:32 UTC (rev 85665)
@@ -5,7 +5,8 @@
long_description = (
open('src/zc/async/README.txt').read() + "\n" +
open('src/zc/async/README_2.txt').read() + "\n" +
- open('src/zc/async/README_3.txt').read() +
+ open('src/zc/async/README_3.txt').read() + "\n" +
+ open('src/zc/async/README_3b.txt').read() +
"\n\n=======\nChanges\n=======\n\n" +
open('src/zc/async/CHANGES.txt').read() + "\n")
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/CHANGES.txt 2008-04-23 20:26:32 UTC (rev 85665)
@@ -10,9 +10,31 @@
- README improved (thanks to Benji York and Sebastian Ware).
-- Callbacks are logged at start and finish in the trace log, including
- verbose tracebacks if the callback generated a failure.
+- Callbacks are logged at start in the trace log.
+- All job results (including callbacks) are logged, including verbose
+ tracebacks if the callback generated a failure.
+
+- Had the ThreadedDispatcherInstaller subscriber stash the thread on the
+ dispatcher, so you can shut down tests like this:
+
+ >>> import zc.async.dispatcher
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+ >>> dispatcher.thread.join(3)
+
+- Added ``getQueue`` to zc.async.local as a convenience (it does what you
+ could already do: ``zc.async.local.getJob().queue``).
+
+- Clarified that ``IQueue.pull`` is the approved way of removing scheduled jobs
+ from a queue in interfaces and README.
+
+- reports in the logs of a job's success or failure come before callbacks are
+ started.
+
+- Added a section showing how the basic_dispatcher_policy.zcml worked, which
+ then pushed the former README_3 examples into README_3b.
+
1.0 (2008-04-09)
================
Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/README.txt 2008-04-23 20:26:32 UTC (rev 85665)
@@ -185,9 +185,9 @@
['']
>>> queue = queues['']
----------
-queue.put
----------
+-------------
+``queue.put``
+-------------
Now we want to actually get some work done. The simplest case is simple
to perform: pass a persistable callable to the queue's ``put`` method and
@@ -244,12 +244,29 @@
lambdas and other functions created dynamically. As we'll see below, the job
instance can help us out there somewhat by offering closure-like features.
+--------------
+``queue.pull``
+--------------
+
+If you put a job into a queue and it hasn't been claimed yet and you want to
+cancel the job, ``pull`` it from the queue.
+
+ >>> len(queue)
+ 0
+ >>> job = queue.put(send_message)
+ >>> len(queue)
+ 1
+ >>> job is queue.pull()
+ True
+ >>> len(queue)
+ 0
+
---------------
Scheduled Calls
---------------
-You can also pass a datetime.datetime to schedule a call. A datetime
-without a timezone is considered to be in the UTC timezone.
+When using ``put``, you can also pass a datetime.datetime to schedule a call. A
+datetime without a timezone is considered to be in the UTC timezone.
>>> t = transaction.begin()
>>> import datetime
@@ -281,10 +298,10 @@
has already timed out, in which case the job fails with an
abort [#already_passed_timed_out]_.
-The queue's `put` method is the essential API. Other methods are used
-to introspect, but are not needed for basic usage.
+The queue's ``put`` method is the essential API. ``pull`` is used rarely. Other
+methods are used to introspect, but are not needed for basic usage.
-But what is that result of the `put` call in the examples above? A
+But what is that result of the ``put`` call in the examples above? A
job? What do you do with that?
Jobs
Modified: zc.async/trunk/src/zc/async/README_3.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_3.txt 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/README_3.txt 2008-04-23 20:26:32 UTC (rev 85665)
@@ -7,12 +7,20 @@
conveniences to automate much of the configuration shown in the section
discussing configuration without Zope 3.
-If you want to set up a client alone, without a dispatcher, include
-configure.zcml, make sure you share the database in which the queues will be
-held, and make sure that either the
-zope.app.keyreference.persistent.connectionOfPersistent adapter is registered,
-or zc.twist.connection.
+Client Set Up
+=============
+If you want to set up a client alone, without a dispatcher, include the egg in
+your setup.py, include the configure.zcml in your applications zcml, make sure
+you share the database in which the queues will be held, and make sure that
+either the zope.app.keyreference.persistent.connectionOfPersistent adapter is
+registered, or zc.twist.connection.
+
+That should be it.
+
+Client/Server Set Up
+====================
+
For a client/server combination, use zcml that is something like the
basic_dispatcher_policy.zcml, make sure you have access to the database with
the queues, configure logging and monitoring as desired, configure the
@@ -30,9 +38,21 @@
the comments in the files that we set up, are the primary differences between
our examples and a real set up.
+We'll do this in two versions. The first version uses a single database, as
+you might do to get started quickly, or for a small site. The second version
+has one database for the main application, and one database for the async data,
+as will be more appropriate for typical production usage.
+
+-----------------------------
+Shared Single Database Set Up
+-----------------------------
+
+As described above, using a shared single database will probably be the
+quickest way to get started. Large-scale production usage will probably prefer
+to use the `Two Database Set Up`_ described later.
+
So, without further ado, here is the text of our zope.conf-alike, and of our
-site.zcml-alike [#get_vals]_. We'll be using two databases for this example,
-as you might want for a site with a fair amount of zc.async usage.
+site.zcml-alike [#get_vals]_.
>>> zope_conf = """
... site-definition %(site_zcml_file)s
@@ -44,13 +64,6 @@
... </filestorage>
... </zodb>
...
- ... <zodb async>
- ... <filestorage>
- ... create true
- ... path %(async_storage_path)s
- ... </filestorage>
- ... </zodb>
- ...
... <product-config zc.z3monitor>
... port %(monitor_port)s
... </product-config>
@@ -95,7 +108,7 @@
...
In a non-trivial production system of you will also probably want to replace
-the two file storages with two <zeoclient> stanzas.
+the file storage with a <zeoclient> stanza.
Also note that an open monitor port should be behind a firewall, of course.
@@ -123,7 +136,7 @@
... <include package="zope.component" file="meta.zcml" />
... <include package="zope.component" />
... <include package="zc.z3monitor" />
- ... <include package="zc.async" file="multidb_dispatcher_policy.zcml" />
+ ... <include package="zc.async" file="basic_dispatcher_policy.zcml" />
...
... <!-- this is usually handled in Zope applications by the
... zope.app.keyreference.persistent.connectionOfPersistent adapter -->
@@ -133,11 +146,6 @@
Now we're done.
-If you want to change policy, change "multidb_dispatcher_policy.zcml" to
-"dispatcher.zcml" in the example above and register your replacement bits for
-the policy in "multidb_dispatcher_policy.zcml". You'll see that most of that
-comes from code in subscribers.py, which can be adjusted easily.
-
If we process these files, and wait for a poll, we've got a working
set up [#process]_.
@@ -219,12 +227,10 @@
False
>>> db.close()
- >>> db.databases['async'].close()
>>> import shutil
>>> shutil.rmtree(dir)
-Hopefully zc.async will be an easy-to-configure, easy-to-use, and useful tool
-for you! Good luck!
+These instructions are very similar to the `Two Database Set Up`_.
.. ......... ..
.. Footnotes ..
@@ -311,4 +317,4 @@
... time.sleep(0.5)
... else:
... assert False, 'job never completed'
- ...
\ No newline at end of file
+ ...
Added: zc.async/trunk/src/zc/async/README_3b.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_3b.txt (rev 0)
+++ zc.async/trunk/src/zc/async/README_3b.txt 2008-04-23 20:26:32 UTC (rev 85665)
@@ -0,0 +1,241 @@
+-------------------
+Two Database Set Up
+-------------------
+
+Even though it is a bit more trouble to set up, large-scale production usage
+will probably prefer to use this approach, over the shared single database
+described above.
+
+For our zope.conf, we only need one additional stanza to the one seen above::
+
+ <zodb async>
+ <filestorage>
+ create true
+ path REPLACE_THIS_WITH_PATH_TO_STORAGE
+ </filestorage>
+ </zodb>
+
+(You would replace "REPLACE_THIS_WITH_PATH_TO_STORAGE" with the path to the
+storage file.)
+
+As before, you will probably prefer to use ZEO rather than FileStorage in
+production
+
+The zdaemon.conf instructions are the same: set the ZC_ASYNC_UUID environment
+variable properly in the zdaemon.conf file.
+
+For our site.zcml, the only difference is that we use the
+multidb_dispatcher_policy.zcml file rather than the
+basic_dispatcher_policy.zcml file.
+
+If you want to change policy, change "multidb_dispatcher_policy.zcml" to
+"dispatcher.zcml" in the example above and register your replacement bits for
+the policy in "multidb_dispatcher_policy.zcml". You'll see that most of that
+comes from code in subscribers.py, which can be adjusted easily.
+
+If we process the files described above, and wait for a poll, we've got a
+working set up [#process_multi]_.
+
+ >>> import zc.async.dispatcher
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> import pprint
+ >>> pprint.pprint(get_poll(0))
+ {'': {'main': {'active jobs': [],
+ 'error': None,
+ 'len': 0,
+ 'new jobs': [],
+ 'size': 3}}}
+ >>> bool(dispatcher.activated)
+ True
+
+As before, we can ask for a job to be performed, and get the result.
+
+ >>> conn = db.open()
+ >>> root = conn.root()
+ >>> import zc.async.interfaces
+ >>> queue = zc.async.interfaces.IQueue(root)
+ >>> import operator
+ >>> import zc.async.job
+ >>> job = queue.put(zc.async.job.Job(operator.mul, 21, 2))
+ >>> import transaction
+ >>> transaction.commit()
+ >>> wait_for_result(job)
+ 42
+
+Hopefully zc.async will be an easy-to-configure, easy-to-use, and useful tool
+for you! Good luck! [#shutdown]_
+
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
+
+.. [#process_multi]
+
+ >>> import errno, os, random, socket, tempfile
+ >>> dir = tempfile.mkdtemp()
+ >>> site_zcml_file = os.path.join(dir, 'site.zcml')
+
+ >>> s = socket.socket()
+ >>> for i in range(20):
+ ... monitor_port = random.randint(20000, 49151)
+ ... try:
+ ... s.bind(('127.0.0.1', monitor_port))
+ ... except socket.error, e:
+ ... if e.args[0] == errno.EADDRINUSE:
+ ... pass
+ ... else:
+ ... raise
+ ... else:
+ ... s.close()
+ ... break
+ ... else:
+ ... assert False, 'could not find available port'
+ ... monitor_port = None
+ ...
+
+ >>> zope_conf = """
+ ... site-definition %(site_zcml_file)s
+ ...
+ ... <zodb main>
+ ... <filestorage>
+ ... create true
+ ... path %(main_storage_path)s
+ ... </filestorage>
+ ... </zodb>
+ ...
+ ... <zodb async>
+ ... <filestorage>
+ ... create true
+ ... path %(async_storage_path)s
+ ... </filestorage>
+ ... </zodb>
+ ...
+ ... <product-config zc.z3monitor>
+ ... port %(monitor_port)s
+ ... </product-config>
+ ...
+ ... <logger>
+ ... level debug
+ ... name zc.async
+ ... propagate no
+ ...
+ ... <logfile>
+ ... path %(async_event_log)s
+ ... </logfile>
+ ... </logger>
+ ...
+ ... <logger>
+ ... level debug
+ ... name zc.async.trace
+ ... propagate no
+ ...
+ ... <logfile>
+ ... path %(async_trace_log)s
+ ... </logfile>
+ ... </logger>
+ ...
+ ... <eventlog>
+ ... <logfile>
+ ... formatter zope.exceptions.log.Formatter
+ ... path STDOUT
+ ... </logfile>
+ ... <logfile>
+ ... formatter zope.exceptions.log.Formatter
+ ... path %(event_log)s
+ ... </logfile>
+ ... </eventlog>
+ ... """ % {'site_zcml_file': site_zcml_file,
+ ... 'main_storage_path': os.path.join(dir, 'main.fs'),
+ ... 'async_storage_path': os.path.join(dir, 'async.fs'),
+ ... 'monitor_port': monitor_port,
+ ... 'event_log': os.path.join(dir, 'z3.log'),
+ ... 'async_event_log': os.path.join(dir, 'async.log'),
+ ... 'async_trace_log': os.path.join(dir, 'async_trace.log'),}
+ ...
+
+ >>> os.environ['ZC_ASYNC_UUID'] = os.path.join(dir, 'uuid.txt')
+
+ >>> site_zcml = """
+ ... <configure xmlns='http://namespaces.zope.org/zope'
+ ... xmlns:meta="http://namespaces.zope.org/meta"
+ ... >
+ ... <include package="zope.component" file="meta.zcml" />
+ ... <include package="zope.component" />
+ ... <include package="zc.z3monitor" />
+ ... <include package="zc.async" file="multidb_dispatcher_policy.zcml" />
+ ...
+ ... <!-- this is usually handled in Zope applications by the
+ ... zope.app.keyreference.persistent.connectionOfPersistent adapter -->
+ ... <adapter factory="zc.twist.connection" />
+ ... </configure>
+ ... """
+
+ >>> zope_conf_file = os.path.join(dir, 'zope.conf')
+ >>> f = open(zope_conf_file, 'w')
+ >>> f.write(zope_conf)
+ >>> f.close()
+ >>> f = open(site_zcml_file, 'w')
+ >>> f.write(site_zcml)
+ >>> f.close()
+
+ >>> import zdaemon.zdoptions
+ >>> import zope.app.appsetup
+ >>> options = zdaemon.zdoptions.ZDOptions()
+ >>> options.schemadir = os.path.join(
+ ... os.path.dirname(os.path.abspath(zope.app.appsetup.__file__)),
+ ... 'schema')
+ >>> options.realize(['-C', zope_conf_file])
+ >>> config = options.configroot
+
+ >>> import zope.app.appsetup.product
+ >>> zope.app.appsetup.product.setProductConfigurations(
+ ... config.product_config)
+ >>> ignore = zope.app.appsetup.config(config.site_definition)
+ >>> import zope.app.appsetup.appsetup
+ >>> db = zope.app.appsetup.appsetup.multi_database(config.databases)[0][0]
+
+ >>> import zope.event
+ >>> import zc.async.interfaces
+ >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
+
+ >>> import time
+ >>> def get_poll(count=None): # just a helper used later, not processing
+ ... if count is None:
+ ... count = len(dispatcher.polls)
+ ... for i in range(30):
+ ... if len(dispatcher.polls) > count:
+ ... return dispatcher.polls.first()
+ ... time.sleep(0.1)
+ ... else:
+ ... assert False, 'no poll!'
+ ...
+
+ >>> def wait_for_result(job): # just a helper used later, not processing
+ ... for i in range(30):
+ ... t = transaction.begin()
+ ... if job.status == zc.async.interfaces.COMPLETED:
+ ... return job.result
+ ... time.sleep(0.5)
+ ... else:
+ ... assert False, 'job never completed'
+ ...
+
+.. [#shutdown]
+
+ >>> import zc.async.dispatcher
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+ >>> dispatcher.thread.join(3)
+
+ >>> t = transaction.begin() # sync
+ >>> import zope.component
+ >>> import zc.async.interfaces
+ >>> uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
+ >>> da = queue.dispatchers[uuid]
+ >>> bool(da.activated)
+ False
+
+ >>> db.close()
+ >>> db.databases['async'].close()
+ >>> import shutil
+ >>> shutil.rmtree(dir)
\ No newline at end of file
Property changes on: zc.async/trunk/src/zc/async/README_3b.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/TODO.txt 2008-04-23 20:26:32 UTC (rev 85665)
@@ -9,4 +9,5 @@
For some other package, maybe:
- TTW Management and logging views, as in zasync (see goals in the "History"
- section of the README).
\ No newline at end of file
+ section of the README).
+
\ No newline at end of file
Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/dispatcher.py 2008-04-23 20:26:32 UTC (rev 85665)
@@ -57,6 +57,9 @@
def getJob(self):
return self.job
+ def getQueue(self):
+ return self.job.queue
+
def getDispatcher(self):
return self.dispatcher
@@ -123,7 +126,7 @@
info['thread'] = thread.get_ident()
info['started'] = datetime.datetime.utcnow()
zc.async.utils.tracelog.info(
- 'starting in thread %d: %r',
+ 'starting in thread %d: %s',
info['thread'], info['call'])
try:
transaction.begin()
@@ -138,13 +141,35 @@
except ZODB.POSException.TransactionError:
transaction.abort()
while 1:
+ job.fail()
try:
- job.fail()
transaction.commit()
except ZODB.POSException.TransactionError:
transaction.abort() # retry forever (!)
else:
break
+ except zc.async.interfaces.BadStatusError:
+ transaction.abort()
+ zc.async.utils.log.error( # notice, not tracelog
+ 'job already completed?', exc_info=True)
+ if job.status == zc.async.interfaces.CALLBACKS:
+ job.resumeCallbacks() # moves the job off the agent
+ else:
+ while 1:
+ status = job.status
+ if status == zc.async.interfaces.COMPLETED:
+ if zc.async.interfaces.IAgent.providedBy(
+ job.parent):
+ job.parent.jobCompleted(job)
+ # moves the job off the agent
+ else:
+ job.fail() # moves the job off the agent
+ try:
+ transaction.commit()
+ except ZODB.POSException.TransactionError:
+ transaction.abort() # retry forever (!)
+ else:
+ break
# should come before 'completed' for threading dance
if isinstance(job.result, twisted.python.failure.Failure):
info['failed'] = True
@@ -156,14 +181,9 @@
finally:
local.job = None
transaction.abort()
- if info['failed']:
- zc.async.utils.tracelog.error(
- '%s failed in thread %d with traceback:\n%s',
- info['call'], info['thread'], info['result'])
- else:
- zc.async.utils.tracelog.info(
- '%s succeeded in thread %d with result:\n%s',
- info['call'], info['thread'], info['result'])
+ zc.async.utils.tracelog.info(
+ 'completed in thread %d: %s',
+ info['thread'], info['call'])
job = self.queue.get()
finally:
conn.close()
@@ -219,6 +239,8 @@
activated = False
conn = None
+ thread = None # this is just a placeholder that other code can use the
+ # way that zc.async.subscribers.ThreadedDispatcherInstaller.__call__ does.
def __init__(self, db, reactor, poll_interval=5, uuid=None):
if uuid is None:
Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/dispatcher.txt 2008-04-23 20:26:32 UTC (rev 85665)
@@ -248,21 +248,33 @@
We also have some log entries.
- >>> info = debug = None
+ >>> info1 = info2 = info3 = debug = None
>>> for r in reversed(trace_logs.records):
- ... if info is None and r.levelname == 'INFO':
- ... info = r
- ... elif debug is None and r.levelname == 'DEBUG':
+ ... if r.levelname == 'INFO':
+ ... if info3 is None:
+ ... info3 = r
+ ... elif info2 is None:
+ ... info2 = r
+ ... elif info1 is None:
+ ... info1 = r
+ ... elif r.levelname == 'DEBUG' and debug is None:
... debug = r
- ... elif info is not None and debug is not None:
+ ... if (info1 is not None and info2 is not None and
+ ... info3 is not None and debug is not None):
... break
... else:
... assert False, 'could not find'
...
- >>> print info.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ >>> print info1.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ starting in thread ...: <zc.async.job.Job (oid ..., db 'unnamed')
+ ``<built-in function mul>(14, 3)``>
+ >>> print info2.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<zc.async.job.Job (oid ..., db 'unnamed')
- ``<built-in function mul>(14, 3)``> succeeded in thread ... with result:
+ ``<built-in function mul>(14, 3)``> succeeded with result:
42
+ >>> print info3.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ completed in thread ...: <zc.async.job.Job (oid ..., db 'unnamed')
+ ``<built-in function mul>(14, 3)``>
>>> print debug.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
poll ...:
@@ -307,15 +319,47 @@
>>> length()
4
-``zc.async.local`` also allows some fun tricks. Your method can access the
-job--for instance, to access the queue and put another job in, or access
-annotations on the job, as of the last database sync for the thread's
-connection (at transaction boundaries).
+If the queue hands out a job that has already been completed somehow, the
+dispatcher will be OK with it, logging the problem to the main event log
+and moving on.
+ >>> badjob = queue.put(zc.async.job.Job(operator.mul, 21, 2))
+ >>> badjob.fail()
+ >>> badjob.status == zc.async.interfaces.COMPLETED
+ True
+ >>> flagjob = queue.put(zc.async.job.Job(operator.mul, 21, 2))
+ >>> transaction.commit()
+ >>> wait_for_result(flagjob)
+ 42
+ >>> for r in reversed(event_logs.records):
+ ... if r.levelname == 'ERROR':
+ ... break
+ ... else:
+ ... assert False, 'did not find log'
+ ...
+ >>> print r.getMessage()
+ job already completed?
+ >>> import traceback, sys
+ >>> traceback.print_exception(*(r.exc_info + (None, sys.stdout)))
+ ... # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ BadStatusError: can only call a job with NEW or ASSIGNED status
+
+Importantly, the job is also moved off of the agent.
+
+ >>> badjob.parent # doctest: +ELLIPSIS
+ <zc.async.agent.Agent object at 0x...>
+ >>> len(badjob.parent)
+ 0
+
+``zc.async.local`` also allows some fun tricks. Your callable can access the
+queue, perhaps to put another job in.
+
>>> import zc.async
>>> def hand_off():
- ... job = zc.async.local.getJob()
- ... return job.queue.put(zc.async.job.Job(operator.mul, 21, 2))
+ ... queue = zc.async.local.getQueue()
+ ... return queue.put(zc.async.job.Job(operator.mul, 21, 2))
...
>>> job3 = queue.put(hand_off)
>>> transaction.commit()
@@ -323,6 +367,10 @@
>>> wait_for_result(job3)
42
+You can also access the job--for instance, to access annotations on the job, as
+of the last database sync for the thread's connection (at transaction
+boundaries). This function is ``zc.async.local.getJob()``, and is seen below.
+
It can also get and set job annotations *live, in another connection*.
This allows you to send messages about job progress, or get live
information about whether you should change or stop your work, for
@@ -356,17 +404,17 @@
analysis.
>>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
- {'failed': 1,
+ {'failed': 2,
'longest active': None,
'longest failed': ('\x00...', 'unnamed'),
'longest successful': ('\x00...', 'unnamed'),
'shortest active': None,
'shortest failed': ('\x00...', 'unnamed'),
'shortest successful': ('\x00...', 'unnamed'),
- 'started': 6,
+ 'started': 8,
'statistics end': datetime.datetime(...),
'statistics start': datetime.datetime(...),
- 'successful': 5,
+ 'successful': 6,
'unknown': 0}
We can get a report on the reactor's status.
@@ -476,7 +524,7 @@
...
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<zc.async.job.Job (oid ..., db 'unnamed')
- ``<built-in function mul>(14, None)``> failed in thread ... with traceback:
+ ``<built-in function mul>(14, None)``> failed with traceback:
*--- Failure #... (pickled) ---
.../zc/async/job.py:...: _call_with_retry(...)
[ Locals ]...
Modified: zc.async/trunk/src/zc/async/interfaces.py
===================================================================
--- zc.async/trunk/src/zc/async/interfaces.py 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/interfaces.py 2008-04-23 20:26:32 UTC (rev 85665)
@@ -313,6 +313,15 @@
queue on the `assignerUUID`.
"""
+ def pull(index=0):
+ """Remove and return a job, by default from the front of the queue.
+
+ Raise IndexError if index does not exist.
+
+ This is the blessed way to remove an unclaimed job from the queue so
+ that dispatchers will not try to perform it.
+ """
+
def claim(filter=None, default=None):
"""returns first due job that is available for the given filter,
removing it from the queue as appropriate; or None, if none are
Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/job.py 2008-04-23 20:26:32 UTC (rev 85665)
@@ -318,10 +318,22 @@
def _complete(self, res, tm):
if isinstance(res, twisted.python.failure.Failure):
res = zc.twist.sanitize(res)
+ failure = True
+ else:
+ failure = False
self._result = res
self._status = zc.async.interfaces.CALLBACKS
self._active_end = datetime.datetime.now(pytz.UTC)
tm.commit()
+ if failure:
+ zc.async.utils.tracelog.error(
+ '%r failed with traceback:\n%s',
+ self,
+ res.getTraceback(elideFrameworkCode=True, detail='verbose'))
+ else:
+ zc.async.utils.tracelog.info(
+ '%r succeeded with result:\n%r',
+ self, res)
return res
def fail(self, e=None):
@@ -349,21 +361,10 @@
zc.async.utils.tracelog.debug(
'starting callback %r to %r', j, self)
j(self.result)
- if isinstance(j.result, twisted.python.failure.Failure):
- zc.async.utils.tracelog.error(
- 'callback %r to %r failed with traceback:\n%s',
- j, self, j.result.getTraceback(
- elideFrameworkCode=True, detail='verbose'))
- else:
- zc.async.utils.tracelog.info(
- 'callback %r to %r succeeded with result:\n%s',
- j, self, j.result)
elif j._status == zc.async.interfaces.ACTIVE:
zc.async.utils.tracelog.debug(
'failing aborted callback %r to %r', j, self)
j.fail()
- zc.async.utils.tracelog.error(
- 'failed aborted callback %r to %r', j, self)
elif j._status == zc.async.interfaces.CALLBACKS:
j.resumeCallbacks()
# TODO: this shouldn't raise anything we want to catch, right?
Modified: zc.async/trunk/src/zc/async/job.txt
===================================================================
--- zc.async/trunk/src/zc/async/job.txt 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/job.txt 2008-04-23 20:26:32 UTC (rev 85665)
@@ -898,26 +898,43 @@
...
exceptions.RuntimeError:
-.. [#callback_log] The callbacks are logged
+.. [#callback_log] The callbacks are logged.
>>> for r in reversed(trace_logs.records):
+ ... if r.levelname == 'DEBUG':
+ ... break
+ ... else:
+ ... assert False, 'could not find log'
+ ...
+ >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ starting callback
+ <...Job (oid ..., db 'unnamed')
+ ``...completeStartedJobArguments(...Job (oid ..., db 'unnamed'))``>
+ to
+ <...Job (oid ..., db 'unnamed')
+ ``...success_or_failure(...Job (oid ..., db 'unnamed'),
+ ...Job (oid ..., db 'unnamed'))``>
+
+
+ As with all job calls, the results are logged.
+
+ >>> for r in reversed(trace_logs.records):
... if r.levelname == 'INFO':
... break
... else:
... assert False, 'could not find log'
...
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- callback <zc.async.job.Job (oid ..., db 'unnamed')
- ``zc.async.job.success_or_failure(zc.async.job.Job (oid ..., db 'unnamed'),
- zc.async.job.Job (oid ..., db 'unnamed'))``>
- to <zc.async.job.Job (oid ..., db 'unnamed')
- ``zc.async.doctest_test.multiply(5, 3)``> succeeded with result:
+ <zc.async.job.Job (oid ..., db 'unnamed')
+ ``zc.async.job.completeStartedJobArguments(zc.async.job.Job
+ (oid ..., db 'unnamed'))``>
+ succeeded with result:
None
+.. [#errback_log] As with all job calls, the results are logged. If the
+ callback receives a failure, but does not fail itself, the log is still
+ just at an "info" level.
-.. [#errback_log] If the callback receives a failure, but does not fail itself,
- the log is still just at an "info" level.
-
>>> for r in reversed(trace_logs.records):
... if r.levelname == 'INFO':
... break
@@ -925,15 +942,14 @@
... assert False, 'could not find log'
...
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- callback <zc.async.job.Job (oid ..., db 'unnamed')
- ``zc.async.job.success_or_failure(zc.async.job.Job (oid ..., db 'unnamed'),
- zc.async.job.Job (oid ..., db 'unnamed'))``>
- to <zc.async.job.Job (oid ..., db 'unnamed')
- ``zc.async.doctest_test.multiply(..., None)``> succeeded with result:
+ <zc.async.job.Job (oid 114, db 'unnamed')
+ ``zc.async.job.completeStartedJobArguments(zc.async.job.Job
+ (oid 109, db 'unnamed'))``>
+ succeeded with result:
None
- However, the callbacks that fail themselves are logged at the "error" level
- and include a detailed traceback.
+ However, the callbacks that fail themselves are logged (again, as all jobs
+ are) at the "error" level and include a detailed traceback.
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 4)
>>> transaction.commit()
@@ -948,7 +964,7 @@
... assert False, 'could not find log'
...
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- callback <...Job...``zc.async.doctest_test.multiply()``> ... failed with traceback:
+ <...Job...``zc.async.doctest_test.multiply()``> failed with traceback:
*--- Failure #... (pickled) ---
.../zc/async/job.py:...
[ Locals...
@@ -966,13 +982,14 @@
job failed is logged.
>>> for r in reversed(trace_logs.records):
- ... if r.levelname == 'ERROR':
+ ... if r.levelname == 'DEBUG' and r.getMessage().startswith('failing'):
... break
... else:
... assert False, 'could not find log'
...
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- failed aborted callback <...Job...``zc.async.doctest_test.multiply(5)``> to <...Job...>
+ failing aborted callback
+ <...Job...``zc.async.doctest_test.multiply(5)``> to <...Job...>
.. [#call_self] Here's a job trying to call itself.
Modified: zc.async/trunk/src/zc/async/queue.py
===================================================================
--- zc.async/trunk/src/zc/async/queue.py 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/queue.py 2008-04-23 20:26:32 UTC (rev 85665)
@@ -319,9 +319,9 @@
return default
uuid = None
for pop, ix, job in self._iter():
- res = None
if job.begin_after > now:
break
+ res = None
quotas = []
if (job.begin_after + job.begin_by) < now:
res = zc.async.interfaces.IJob(
Modified: zc.async/trunk/src/zc/async/subscribers.py
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.py 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/subscribers.py 2008-04-23 20:26:32 UTC (rev 85665)
@@ -81,7 +81,12 @@
def start():
dispatcher.activate()
reactor.run(installSignalHandlers=0)
- thread = threading.Thread(target=start)
+ # we stash the thread object on the dispatcher so functional tests
+ # can do the following at the end of a test:
+ # dispatcher = zc.async.dispatcher.get()
+ # dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+ # dispatcher.thread.join(3)
+ dispatcher.thread = thread = threading.Thread(target=start)
thread.setDaemon(True)
thread.start()
Modified: zc.async/trunk/src/zc/async/subscribers.txt
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.txt 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/subscribers.txt 2008-04-23 20:26:32 UTC (rev 85665)
@@ -35,6 +35,13 @@
>>> get_poll(0)
{}
+The function also stashed the thread on the dispatcher, so we can write tests
+that can grab it easily.
+
+ >>> import threading
+ >>> isinstance(dispatcher.thread, threading.Thread)
+ True
+
The function also installs some signal handlers to optimize shutdown. We'll
look at them soon. For now, let's install some queues.
Modified: zc.async/trunk/src/zc/async/z3tests.py
===================================================================
--- zc.async/trunk/src/zc/async/z3tests.py 2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/z3tests.py 2008-04-23 20:26:32 UTC (rev 85665)
@@ -17,6 +17,11 @@
f.close()
zc.async.instanceuuid.UUID = zc.async.instanceuuid.getUUID()
+def tearDown(test):
+ import zc.async.dispatcher
+ zc.async.dispatcher.pop()
+ zope.component.testing.tearDown(test)
+
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite(
@@ -25,8 +30,9 @@
optionflags=doctest.INTERPRET_FOOTNOTES),
doctest.DocFileSuite(
'README_3.txt',
+ 'README_3b.txt',
setUp=zope.component.testing.setUp,
- tearDown=zope.component.testing.tearDown,
+ tearDown=tearDown,
optionflags=doctest.INTERPRET_FOOTNOTES),
))
More information about the Checkins
mailing list