[Checkins] SVN: zc.async/trunk/s Multiple small changes.

Gary Poster gary at zope.com
Wed Apr 23 16:26:33 EDT 2008


Log message for revision 85665:
  Multiple small changes.
  
  - Callbacks are logged at start in the trace log.
  
  - All job results (including callbacks) are logged, including verbose
    tracebacks if the callback generated a failure.
  
  - Had the ThreadedDispatcherInstaller subscriber stash the thread on the
    dispatcher, so you can shut down tests like this:
    
    >>> import zc.async.dispatcher
    >>> dispatcher = zc.async.dispatcher.get()
    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
    >>> dispatcher.thread.join(3)
  
  - Added ``getQueue`` to zc.async.local as a convenience (it does what you
    could already do: ``zc.async.local.getJob().queue``).
  
  - Clarified that ``IQueue.pull`` is the approved way of removing scheduled jobs
    from a queue in interfaces and README.
  
  - reports in the logs of a job's success or failure come before callbacks are
    started.
  
  - Added a section showing how the basic_dispatcher_policy.zcml worked, which
    then pushed the former README_3 examples into README_3b.
  
  

Changed:
  U   zc.async/trunk/setup.py
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/README.txt
  U   zc.async/trunk/src/zc/async/README_3.txt
  A   zc.async/trunk/src/zc/async/README_3b.txt
  U   zc.async/trunk/src/zc/async/TODO.txt
  U   zc.async/trunk/src/zc/async/dispatcher.py
  U   zc.async/trunk/src/zc/async/dispatcher.txt
  U   zc.async/trunk/src/zc/async/interfaces.py
  U   zc.async/trunk/src/zc/async/job.py
  U   zc.async/trunk/src/zc/async/job.txt
  U   zc.async/trunk/src/zc/async/queue.py
  U   zc.async/trunk/src/zc/async/subscribers.py
  U   zc.async/trunk/src/zc/async/subscribers.txt
  U   zc.async/trunk/src/zc/async/z3tests.py

-=-
Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/setup.py	2008-04-23 20:26:32 UTC (rev 85665)
@@ -5,7 +5,8 @@
 long_description = (
     open('src/zc/async/README.txt').read() + "\n" +
     open('src/zc/async/README_2.txt').read() + "\n" +
-    open('src/zc/async/README_3.txt').read() +
+    open('src/zc/async/README_3.txt').read() + "\n" +
+    open('src/zc/async/README_3b.txt').read() +
     "\n\n=======\nChanges\n=======\n\n" +
     open('src/zc/async/CHANGES.txt').read() + "\n")
 

Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-04-23 20:26:32 UTC (rev 85665)
@@ -10,9 +10,31 @@
 
 - README improved (thanks to Benji York and Sebastian Ware).
 
-- Callbacks are logged at start and finish in the trace log, including
-  verbose tracebacks if the callback generated a failure.
+- Callbacks are logged at start in the trace log.
 
+- All job results (including callbacks) are logged, including verbose
+  tracebacks if the callback generated a failure.
+
+- Had the ThreadedDispatcherInstaller subscriber stash the thread on the
+  dispatcher, so you can shut down tests like this:
+  
+  >>> import zc.async.dispatcher
+  >>> dispatcher = zc.async.dispatcher.get()
+  >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+  >>> dispatcher.thread.join(3)
+
+- Added ``getQueue`` to zc.async.local as a convenience (it does what you
+  could already do: ``zc.async.local.getJob().queue``).
+
+- Clarified that ``IQueue.pull`` is the approved way of removing scheduled jobs
+  from a queue in interfaces and README.
+
+- reports in the logs of a job's success or failure come before callbacks are
+  started.
+
+- Added a section showing how the basic_dispatcher_policy.zcml worked, which
+  then pushed the former README_3 examples into README_3b.
+
 1.0 (2008-04-09)
 ================
 

Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/README.txt	2008-04-23 20:26:32 UTC (rev 85665)
@@ -185,9 +185,9 @@
     ['']
     >>> queue = queues['']
 
----------
-queue.put
----------
+-------------
+``queue.put``
+-------------
 
 Now we want to actually get some work done.  The simplest case is simple
 to perform: pass a persistable callable to the queue's ``put`` method and
@@ -244,12 +244,29 @@
 lambdas and other functions created dynamically. As we'll see below, the job
 instance can help us out there somewhat by offering closure-like features.
 
+--------------
+``queue.pull``
+--------------
+
+If you put a job into a queue and it hasn't been claimed yet and you want to
+cancel the job, ``pull`` it from the queue.
+
+    >>> len(queue)
+    0
+    >>> job = queue.put(send_message)
+    >>> len(queue)
+    1
+    >>> job is queue.pull()
+    True
+    >>> len(queue)
+    0
+
 ---------------
 Scheduled Calls
 ---------------
 
-You can also pass a datetime.datetime to schedule a call.  A datetime
-without a timezone is considered to be in the UTC timezone.
+When using ``put``, you can also pass a datetime.datetime to schedule a call. A
+datetime without a timezone is considered to be in the UTC timezone.
 
     >>> t = transaction.begin()
     >>> import datetime
@@ -281,10 +298,10 @@
 has already timed out, in which case the job fails with an
 abort [#already_passed_timed_out]_.
 
-The queue's `put` method is the essential API.  Other methods are used
-to introspect, but are not needed for basic usage.
+The queue's ``put`` method is the essential API. ``pull`` is used rarely. Other
+methods are used to introspect, but are not needed for basic usage.
 
-But what is that result of the `put` call in the examples above?  A
+But what is that result of the ``put`` call in the examples above?  A
 job?  What do you do with that?
 
 Jobs

Modified: zc.async/trunk/src/zc/async/README_3.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_3.txt	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/README_3.txt	2008-04-23 20:26:32 UTC (rev 85665)
@@ -7,12 +7,20 @@
 conveniences to automate much of the configuration shown in the section
 discussing configuration without Zope 3.
 
-If you want to set up a client alone, without a dispatcher, include
-configure.zcml, make sure you share the database in which the queues will be
-held, and make sure that either the
-zope.app.keyreference.persistent.connectionOfPersistent adapter is registered,
-or zc.twist.connection.
+Client Set Up
+=============
 
+If you want to set up a client alone, without a dispatcher, include the egg in
+your setup.py, include the configure.zcml in your applications zcml, make sure
+you share the database in which the queues will be held, and make sure that
+either the zope.app.keyreference.persistent.connectionOfPersistent adapter is
+registered, or zc.twist.connection.
+
+That should be it.
+
+Client/Server Set Up
+====================
+
 For a client/server combination, use zcml that is something like the
 basic_dispatcher_policy.zcml, make sure you have access to the database with
 the queues, configure logging and monitoring as desired, configure the
@@ -30,9 +38,21 @@
 the comments in the files that we set up, are the primary differences between
 our examples and a real set up.
 
+We'll do this in two versions.  The first version uses a single database, as
+you might do to get started quickly, or for a small site.  The second version
+has one database for the main application, and one database for the async data,
+as will be more appropriate for typical production usage.
+
+-----------------------------
+Shared Single Database Set Up
+-----------------------------
+
+As described above, using a shared single database will probably be the
+quickest way to get started.  Large-scale production usage will probably prefer
+to use the `Two Database Set Up`_ described later.
+
 So, without further ado, here is the text of our zope.conf-alike, and of our
-site.zcml-alike [#get_vals]_.  We'll be using two databases for this example,
-as you might want for a site with a fair amount of zc.async usage.
+site.zcml-alike [#get_vals]_.
 
     >>> zope_conf = """
     ... site-definition %(site_zcml_file)s
@@ -44,13 +64,6 @@
     ...   </filestorage>
     ... </zodb>
     ... 
-    ... <zodb async>
-    ...   <filestorage>
-    ...     create true
-    ...     path %(async_storage_path)s
-    ...   </filestorage>
-    ... </zodb>
-    ... 
     ... <product-config zc.z3monitor>
     ...   port %(monitor_port)s
     ... </product-config>
@@ -95,7 +108,7 @@
     ... 
 
 In a non-trivial production system of you will also probably want to replace
-the two file storages with two <zeoclient> stanzas.
+the file storage with a <zeoclient> stanza.
 
 Also note that an open monitor port should be behind a firewall, of course.
 
@@ -123,7 +136,7 @@
     ... <include package="zope.component" file="meta.zcml" />
     ... <include package="zope.component" />
     ... <include package="zc.z3monitor" />
-    ... <include package="zc.async" file="multidb_dispatcher_policy.zcml" />
+    ... <include package="zc.async" file="basic_dispatcher_policy.zcml" />
     ...
     ... <!-- this is usually handled in Zope applications by the
     ...      zope.app.keyreference.persistent.connectionOfPersistent adapter -->
@@ -133,11 +146,6 @@
 
 Now we're done.
 
-If you want to change policy, change "multidb_dispatcher_policy.zcml" to
-"dispatcher.zcml" in the example above and register your replacement bits for
-the policy in "multidb_dispatcher_policy.zcml".  You'll see that most of that
-comes from code in subscribers.py, which can be adjusted easily.
-
 If we process these files, and wait for a poll, we've got a working
 set up [#process]_.
 
@@ -219,12 +227,10 @@
     False
 
     >>> db.close()
-    >>> db.databases['async'].close()
     >>> import shutil
     >>> shutil.rmtree(dir)
 
-Hopefully zc.async will be an easy-to-configure, easy-to-use, and useful tool
-for you! Good luck!
+These instructions are very similar to the `Two Database Set Up`_.
 
 .. ......... ..
 .. Footnotes ..
@@ -311,4 +317,4 @@
     ...         time.sleep(0.5)
     ...     else:
     ...         assert False, 'job never completed'
-    ...
\ No newline at end of file
+    ...

Added: zc.async/trunk/src/zc/async/README_3b.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_3b.txt	                        (rev 0)
+++ zc.async/trunk/src/zc/async/README_3b.txt	2008-04-23 20:26:32 UTC (rev 85665)
@@ -0,0 +1,241 @@
+-------------------
+Two Database Set Up
+-------------------
+
+Even though it is a bit more trouble to set up, large-scale production usage
+will probably prefer to use this approach, over the shared single database
+described above.
+
+For our zope.conf, we only need one additional stanza to the one seen above::
+
+    <zodb async>
+      <filestorage>
+        create true
+        path REPLACE_THIS_WITH_PATH_TO_STORAGE
+      </filestorage>
+    </zodb>
+
+(You would replace "REPLACE_THIS_WITH_PATH_TO_STORAGE" with the path to the
+storage file.)
+
+As before, you will probably prefer to use ZEO rather than FileStorage in
+production
+
+The zdaemon.conf instructions are the same: set the ZC_ASYNC_UUID environment
+variable properly in the zdaemon.conf file.
+
+For our site.zcml, the only difference is that we use the
+multidb_dispatcher_policy.zcml file rather than the
+basic_dispatcher_policy.zcml file.
+
+If you want to change policy, change "multidb_dispatcher_policy.zcml" to
+"dispatcher.zcml" in the example above and register your replacement bits for
+the policy in "multidb_dispatcher_policy.zcml".  You'll see that most of that
+comes from code in subscribers.py, which can be adjusted easily.
+
+If we process the files described above, and wait for a poll, we've got a
+working set up [#process_multi]_.
+
+    >>> import zc.async.dispatcher
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> import pprint
+    >>> pprint.pprint(get_poll(0))
+    {'': {'main': {'active jobs': [],
+                   'error': None,
+                   'len': 0,
+                   'new jobs': [],
+                   'size': 3}}}
+    >>> bool(dispatcher.activated)
+    True
+
+As before, we can ask for a job to be performed, and get the result.
+
+    >>> conn = db.open()
+    >>> root = conn.root()
+    >>> import zc.async.interfaces
+    >>> queue = zc.async.interfaces.IQueue(root)
+    >>> import operator
+    >>> import zc.async.job
+    >>> job = queue.put(zc.async.job.Job(operator.mul, 21, 2))
+    >>> import transaction
+    >>> transaction.commit()
+    >>> wait_for_result(job)
+    42
+
+Hopefully zc.async will be an easy-to-configure, easy-to-use, and useful tool
+for you! Good luck! [#shutdown]_
+
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
+
+.. [#process_multi]
+
+    >>> import errno, os, random, socket, tempfile
+    >>> dir = tempfile.mkdtemp()
+    >>> site_zcml_file = os.path.join(dir, 'site.zcml')
+
+    >>> s = socket.socket()
+    >>> for i in range(20):
+    ...     monitor_port = random.randint(20000, 49151)
+    ...     try:
+    ...         s.bind(('127.0.0.1', monitor_port))
+    ...     except socket.error, e:
+    ...         if e.args[0] == errno.EADDRINUSE:
+    ...             pass
+    ...         else:
+    ...             raise
+    ...     else:
+    ...         s.close()
+    ...         break
+    ... else:
+    ...     assert False, 'could not find available port'
+    ...     monitor_port = None
+    ...
+
+    >>> zope_conf = """
+    ... site-definition %(site_zcml_file)s
+    ...
+    ... <zodb main>
+    ...   <filestorage>
+    ...     create true
+    ...     path %(main_storage_path)s
+    ...   </filestorage>
+    ... </zodb>
+    ... 
+    ... <zodb async>
+    ...   <filestorage>
+    ...     create true
+    ...     path %(async_storage_path)s
+    ...   </filestorage>
+    ... </zodb>
+    ... 
+    ... <product-config zc.z3monitor>
+    ...   port %(monitor_port)s
+    ... </product-config>
+    ... 
+    ... <logger>
+    ...   level debug
+    ...   name zc.async
+    ...   propagate no
+    ... 
+    ...   <logfile>
+    ...     path %(async_event_log)s
+    ...   </logfile>
+    ... </logger>
+    ... 
+    ... <logger>
+    ...   level debug
+    ...   name zc.async.trace
+    ...   propagate no
+    ... 
+    ...   <logfile>
+    ...     path %(async_trace_log)s
+    ...   </logfile>
+    ... </logger>
+    ... 
+    ... <eventlog>
+    ...   <logfile>
+    ...     formatter zope.exceptions.log.Formatter
+    ...     path STDOUT
+    ...   </logfile>
+    ...   <logfile>
+    ...     formatter zope.exceptions.log.Formatter
+    ...     path %(event_log)s
+    ...   </logfile>
+    ... </eventlog>
+    ... """ % {'site_zcml_file': site_zcml_file,
+    ...        'main_storage_path': os.path.join(dir, 'main.fs'),
+    ...        'async_storage_path': os.path.join(dir, 'async.fs'),
+    ...        'monitor_port': monitor_port,
+    ...        'event_log': os.path.join(dir, 'z3.log'),
+    ...        'async_event_log': os.path.join(dir, 'async.log'),
+    ...        'async_trace_log': os.path.join(dir, 'async_trace.log'),}
+    ... 
+
+    >>> os.environ['ZC_ASYNC_UUID'] = os.path.join(dir, 'uuid.txt')
+
+    >>> site_zcml = """
+    ... <configure xmlns='http://namespaces.zope.org/zope'
+    ...            xmlns:meta="http://namespaces.zope.org/meta"
+    ...            >
+    ... <include package="zope.component" file="meta.zcml" />
+    ... <include package="zope.component" />
+    ... <include package="zc.z3monitor" />
+    ... <include package="zc.async" file="multidb_dispatcher_policy.zcml" />
+    ...
+    ... <!-- this is usually handled in Zope applications by the
+    ...      zope.app.keyreference.persistent.connectionOfPersistent adapter -->
+    ... <adapter factory="zc.twist.connection" />
+    ... </configure>
+    ... """
+
+    >>> zope_conf_file = os.path.join(dir, 'zope.conf')
+    >>> f = open(zope_conf_file, 'w')
+    >>> f.write(zope_conf)
+    >>> f.close()
+    >>> f = open(site_zcml_file, 'w')
+    >>> f.write(site_zcml)
+    >>> f.close()
+
+    >>> import zdaemon.zdoptions
+    >>> import zope.app.appsetup
+    >>> options = zdaemon.zdoptions.ZDOptions()
+    >>> options.schemadir = os.path.join(
+    ...     os.path.dirname(os.path.abspath(zope.app.appsetup.__file__)),
+    ...     'schema')
+    >>> options.realize(['-C', zope_conf_file])
+    >>> config = options.configroot
+
+    >>> import zope.app.appsetup.product
+    >>> zope.app.appsetup.product.setProductConfigurations(
+    ...     config.product_config)
+    >>> ignore = zope.app.appsetup.config(config.site_definition)
+    >>> import zope.app.appsetup.appsetup
+    >>> db = zope.app.appsetup.appsetup.multi_database(config.databases)[0][0]
+
+    >>> import zope.event
+    >>> import zc.async.interfaces
+    >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
+
+    >>> import time
+    >>> def get_poll(count=None): # just a helper used later, not processing
+    ...     if count is None:
+    ...         count = len(dispatcher.polls)
+    ...     for i in range(30):
+    ...         if len(dispatcher.polls) > count:
+    ...             return dispatcher.polls.first()
+    ...         time.sleep(0.1)
+    ...     else:
+    ...         assert False, 'no poll!'
+    ... 
+
+    >>> def wait_for_result(job): # just a helper used later, not processing
+    ...     for i in range(30):
+    ...         t = transaction.begin()
+    ...         if job.status == zc.async.interfaces.COMPLETED:
+    ...             return job.result
+    ...         time.sleep(0.5)
+    ...     else:
+    ...         assert False, 'job never completed'
+    ...
+
+.. [#shutdown]
+
+    >>> import zc.async.dispatcher
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+    >>> dispatcher.thread.join(3)
+
+    >>> t = transaction.begin() # sync
+    >>> import zope.component
+    >>> import zc.async.interfaces
+    >>> uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
+    >>> da = queue.dispatchers[uuid]
+    >>> bool(da.activated)
+    False
+
+    >>> db.close()
+    >>> db.databases['async'].close()
+    >>> import shutil
+    >>> shutil.rmtree(dir)
\ No newline at end of file


Property changes on: zc.async/trunk/src/zc/async/README_3b.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/TODO.txt	2008-04-23 20:26:32 UTC (rev 85665)
@@ -9,4 +9,5 @@
 For some other package, maybe:
 
 - TTW Management and logging views, as in zasync (see goals in the "History"
-  section of the README).
\ No newline at end of file
+  section of the README).
+  
\ No newline at end of file

Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/dispatcher.py	2008-04-23 20:26:32 UTC (rev 85665)
@@ -57,6 +57,9 @@
     def getJob(self):
         return self.job
 
+    def getQueue(self):
+        return self.job.queue
+
     def getDispatcher(self):
         return self.dispatcher
 
@@ -123,7 +126,7 @@
                 info['thread'] = thread.get_ident()
                 info['started'] = datetime.datetime.utcnow()
                 zc.async.utils.tracelog.info(
-                    'starting in thread %d: %r',
+                    'starting in thread %d: %s',
                     info['thread'], info['call'])
                 try:
                     transaction.begin()
@@ -138,13 +141,35 @@
                     except ZODB.POSException.TransactionError:
                         transaction.abort()
                         while 1:
+                            job.fail()
                             try:
-                                job.fail()
                                 transaction.commit()
                             except ZODB.POSException.TransactionError:
                                 transaction.abort() # retry forever (!)
                             else:
                                 break
+                    except zc.async.interfaces.BadStatusError:
+                        transaction.abort()
+                        zc.async.utils.log.error( # notice, not tracelog
+                            'job already completed?', exc_info=True)
+                        if job.status == zc.async.interfaces.CALLBACKS:
+                            job.resumeCallbacks() # moves the job off the agent
+                        else:
+                            while 1:
+                                status = job.status
+                                if status == zc.async.interfaces.COMPLETED:
+                                    if zc.async.interfaces.IAgent.providedBy(
+                                        job.parent):
+                                        job.parent.jobCompleted(job)
+                                        # moves the job off the agent
+                                else:
+                                    job.fail() # moves the job off the agent
+                                try:
+                                    transaction.commit()
+                                except ZODB.POSException.TransactionError:
+                                    transaction.abort() # retry forever (!)
+                                else:
+                                    break
                     # should come before 'completed' for threading dance
                     if isinstance(job.result, twisted.python.failure.Failure):
                         info['failed'] = True
@@ -156,14 +181,9 @@
                 finally:
                     local.job = None
                     transaction.abort()
-                if info['failed']:
-                    zc.async.utils.tracelog.error(
-                        '%s failed in thread %d with traceback:\n%s',
-                        info['call'], info['thread'], info['result'])
-                else:
-                    zc.async.utils.tracelog.info(
-                        '%s succeeded in thread %d with result:\n%s',
-                        info['call'], info['thread'], info['result'])
+                zc.async.utils.tracelog.info(
+                    'completed in thread %d: %s',
+                    info['thread'], info['call'])
                 job = self.queue.get()
         finally:
             conn.close()
@@ -219,6 +239,8 @@
 
     activated = False
     conn = None
+    thread = None # this is just a placeholder that other code can use the
+    # way that zc.async.subscribers.ThreadedDispatcherInstaller.__call__ does.
 
     def __init__(self, db, reactor, poll_interval=5, uuid=None):
         if uuid is None:

Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/dispatcher.txt	2008-04-23 20:26:32 UTC (rev 85665)
@@ -248,21 +248,33 @@
 
 We also have some log entries.
 
-    >>> info = debug = None
+    >>> info1 = info2 = info3 = debug = None
     >>> for r in reversed(trace_logs.records):
-    ...     if info is None and r.levelname == 'INFO':
-    ...         info = r
-    ...     elif debug is None and r.levelname == 'DEBUG':
+    ...     if r.levelname == 'INFO':
+    ...         if info3 is None:
+    ...             info3 = r
+    ...         elif info2 is None:
+    ...             info2 = r
+    ...         elif info1 is None:
+    ...             info1 = r
+    ...     elif r.levelname == 'DEBUG' and debug is None:
     ...         debug = r
-    ...     elif info is not None and debug is not None:
+    ...     if (info1 is not None and info2 is not None and
+    ...         info3 is not None and debug is not None):
     ...         break
     ... else:
     ...     assert False, 'could not find'
     ...
-    >>> print info.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    >>> print info1.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    starting in thread ...: <zc.async.job.Job (oid ..., db 'unnamed')
+                             ``<built-in function mul>(14, 3)``>
+    >>> print info2.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     <zc.async.job.Job (oid ..., db 'unnamed')
-     ``<built-in function mul>(14, 3)``> succeeded in thread ... with result:
+     ``<built-in function mul>(14, 3)``> succeeded with result:
     42
+    >>> print info3.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    completed in thread ...: <zc.async.job.Job (oid ..., db 'unnamed')
+                             ``<built-in function mul>(14, 3)``>
 
     >>> print debug.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     poll ...:
@@ -307,15 +319,47 @@
     >>> length()
     4
 
-``zc.async.local`` also allows some fun tricks.  Your method can access the
-job--for instance, to access the queue and put another job in, or access
-annotations on the job, as of the last database sync for the thread's
-connection (at transaction boundaries).
+If the queue hands out a job that has already been completed somehow, the
+dispatcher will be OK with it, logging the problem to the main event log
+and moving on.
 
+    >>> badjob = queue.put(zc.async.job.Job(operator.mul, 21, 2))
+    >>> badjob.fail()
+    >>> badjob.status == zc.async.interfaces.COMPLETED
+    True
+    >>> flagjob = queue.put(zc.async.job.Job(operator.mul, 21, 2))
+    >>> transaction.commit()
+    >>> wait_for_result(flagjob)
+    42
+    >>> for r in reversed(event_logs.records):
+    ...     if r.levelname == 'ERROR':
+    ...         break
+    ... else:
+    ...     assert False, 'did not find log'
+    ...
+    >>> print r.getMessage()
+    job already completed?
+    >>> import traceback, sys
+    >>> traceback.print_exception(*(r.exc_info + (None, sys.stdout)))
+    ... # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    BadStatusError: can only call a job with NEW or ASSIGNED status
+
+Importantly, the job is also moved off of the agent.
+
+    >>> badjob.parent # doctest: +ELLIPSIS
+    <zc.async.agent.Agent object at 0x...>
+    >>> len(badjob.parent)
+    0
+
+``zc.async.local`` also allows some fun tricks.  Your callable can access the
+queue, perhaps to put another job in.
+
     >>> import zc.async
     >>> def hand_off():
-    ...     job = zc.async.local.getJob()
-    ...     return job.queue.put(zc.async.job.Job(operator.mul, 21, 2))
+    ...     queue = zc.async.local.getQueue()
+    ...     return queue.put(zc.async.job.Job(operator.mul, 21, 2))
     ...
     >>> job3 = queue.put(hand_off)
     >>> transaction.commit()
@@ -323,6 +367,10 @@
     >>> wait_for_result(job3)
     42
 
+You can also access the job--for instance, to access annotations on the job, as
+of the last database sync for the thread's connection (at transaction
+boundaries).  This function is ``zc.async.local.getJob()``, and is seen below.
+
 It can also get and set job annotations *live, in another connection*. 
 This allows you to send messages about job progress, or get live
 information about whether you should change or stop your work, for
@@ -356,17 +404,17 @@
 analysis.
 
     >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
-    {'failed': 1,
+    {'failed': 2,
      'longest active': None,
      'longest failed': ('\x00...', 'unnamed'),
      'longest successful': ('\x00...', 'unnamed'),
      'shortest active': None,
      'shortest failed': ('\x00...', 'unnamed'),
      'shortest successful': ('\x00...', 'unnamed'),
-     'started': 6,
+     'started': 8,
      'statistics end': datetime.datetime(...),
      'statistics start': datetime.datetime(...),
-     'successful': 5,
+     'successful': 6,
      'unknown': 0}
 
 We can get a report on the reactor's status.
@@ -476,7 +524,7 @@
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     <zc.async.job.Job (oid ..., db 'unnamed')
-     ``<built-in function mul>(14, None)``> failed in thread ... with traceback:
+     ``<built-in function mul>(14, None)``> failed with traceback:
     *--- Failure #... (pickled) ---
     .../zc/async/job.py:...: _call_with_retry(...)
      [ Locals ]...

Modified: zc.async/trunk/src/zc/async/interfaces.py
===================================================================
--- zc.async/trunk/src/zc/async/interfaces.py	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/interfaces.py	2008-04-23 20:26:32 UTC (rev 85665)
@@ -313,6 +313,15 @@
         queue on the `assignerUUID`.
         """
 
+    def pull(index=0):
+        """Remove and return a job, by default from the front of the queue.
+
+        Raise IndexError if index does not exist.
+        
+        This is the blessed way to remove an unclaimed job from the queue so
+        that dispatchers will not try to perform it.
+        """
+
     def claim(filter=None, default=None):
         """returns first due job that is available for the given filter,
         removing it from the queue as appropriate; or None, if none are

Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/job.py	2008-04-23 20:26:32 UTC (rev 85665)
@@ -318,10 +318,22 @@
     def _complete(self, res, tm):
         if isinstance(res, twisted.python.failure.Failure):
             res = zc.twist.sanitize(res)
+            failure = True
+        else:
+            failure = False
         self._result = res
         self._status = zc.async.interfaces.CALLBACKS
         self._active_end = datetime.datetime.now(pytz.UTC)
         tm.commit()
+        if failure:
+            zc.async.utils.tracelog.error(
+                '%r failed with traceback:\n%s',
+                self,
+                res.getTraceback(elideFrameworkCode=True, detail='verbose'))
+        else:
+            zc.async.utils.tracelog.info(
+                '%r succeeded with result:\n%r',
+                self, res)
         return res
 
     def fail(self, e=None):
@@ -349,21 +361,10 @@
                     zc.async.utils.tracelog.debug(
                         'starting callback %r to %r', j, self)
                     j(self.result)
-                    if isinstance(j.result, twisted.python.failure.Failure):
-                        zc.async.utils.tracelog.error(
-                            'callback %r to %r failed with traceback:\n%s',
-                            j, self, j.result.getTraceback(
-                                elideFrameworkCode=True, detail='verbose'))
-                    else:
-                        zc.async.utils.tracelog.info(
-                            'callback %r to %r succeeded with result:\n%s',
-                            j, self, j.result)
                 elif j._status == zc.async.interfaces.ACTIVE:
                     zc.async.utils.tracelog.debug(
                         'failing aborted callback %r to %r', j, self)
                     j.fail()
-                    zc.async.utils.tracelog.error(
-                        'failed aborted callback %r to %r', j, self)
                 elif j._status == zc.async.interfaces.CALLBACKS:
                     j.resumeCallbacks()
                 # TODO: this shouldn't raise anything we want to catch, right?

Modified: zc.async/trunk/src/zc/async/job.txt
===================================================================
--- zc.async/trunk/src/zc/async/job.txt	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/job.txt	2008-04-23 20:26:32 UTC (rev 85665)
@@ -898,26 +898,43 @@
     ...
     exceptions.RuntimeError:
 
-.. [#callback_log] The callbacks are logged
+.. [#callback_log] The callbacks are logged.
 
     >>> for r in reversed(trace_logs.records):
+    ...     if r.levelname == 'DEBUG':
+    ...         break
+    ... else:
+    ...     assert False, 'could not find log'
+    ...
+    >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    starting callback
+     <...Job (oid ..., db 'unnamed')
+      ``...completeStartedJobArguments(...Job (oid ..., db 'unnamed'))``>
+    to
+     <...Job (oid ..., db 'unnamed')
+      ``...success_or_failure(...Job (oid ..., db 'unnamed'),
+                              ...Job (oid ..., db 'unnamed'))``>
+
+
+    As with all job calls, the results are logged.
+
+    >>> for r in reversed(trace_logs.records):
     ...     if r.levelname == 'INFO':
     ...         break
     ... else:
     ...     assert False, 'could not find log'
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    callback <zc.async.job.Job (oid ..., db 'unnamed')
-    ``zc.async.job.success_or_failure(zc.async.job.Job (oid ..., db 'unnamed'),
-      zc.async.job.Job (oid ..., db 'unnamed'))``>
-    to <zc.async.job.Job (oid ..., db 'unnamed')
-    ``zc.async.doctest_test.multiply(5, 3)``> succeeded with result:
+    <zc.async.job.Job (oid ..., db 'unnamed')
+     ``zc.async.job.completeStartedJobArguments(zc.async.job.Job
+                                                (oid ..., db 'unnamed'))``>
+    succeeded with result:
     None
 
+.. [#errback_log] As with all job calls, the results are logged. If the
+    callback receives a failure, but does not fail itself, the log is still
+    just at an "info" level.
 
-.. [#errback_log] If the callback receives a failure, but does not fail itself,
-    the log is still just at an "info" level.
-
     >>> for r in reversed(trace_logs.records):
     ...     if r.levelname == 'INFO':
     ...         break
@@ -925,15 +942,14 @@
     ...     assert False, 'could not find log'
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    callback <zc.async.job.Job (oid ..., db 'unnamed')
-    ``zc.async.job.success_or_failure(zc.async.job.Job (oid ..., db 'unnamed'),
-      zc.async.job.Job (oid ..., db 'unnamed'))``>
-    to <zc.async.job.Job (oid ..., db 'unnamed')
-    ``zc.async.doctest_test.multiply(..., None)``> succeeded with result:
+    <zc.async.job.Job (oid 114, db 'unnamed')
+     ``zc.async.job.completeStartedJobArguments(zc.async.job.Job
+                                                (oid 109, db 'unnamed'))``>
+    succeeded with result:
     None
 
-    However, the callbacks that fail themselves are logged at the "error" level
-    and include a detailed traceback.
+    However, the callbacks that fail themselves are logged (again, as all jobs
+    are) at the "error" level and include a detailed traceback.
 
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 4)
     >>> transaction.commit()
@@ -948,7 +964,7 @@
     ...     assert False, 'could not find log'
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    callback <...Job...``zc.async.doctest_test.multiply()``> ... failed with traceback:
+    <...Job...``zc.async.doctest_test.multiply()``> failed with traceback:
     *--- Failure #... (pickled) ---
     .../zc/async/job.py:...
      [ Locals...
@@ -966,13 +982,14 @@
     job failed is logged.
 
     >>> for r in reversed(trace_logs.records):
-    ...     if r.levelname == 'ERROR':
+    ...     if r.levelname == 'DEBUG' and r.getMessage().startswith('failing'):
     ...         break
     ... else:
     ...     assert False, 'could not find log'
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    failed aborted callback <...Job...``zc.async.doctest_test.multiply(5)``> to <...Job...>
+    failing aborted callback
+    <...Job...``zc.async.doctest_test.multiply(5)``> to <...Job...>
 
 .. [#call_self] Here's a job trying to call itself.
 

Modified: zc.async/trunk/src/zc/async/queue.py
===================================================================
--- zc.async/trunk/src/zc/async/queue.py	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/queue.py	2008-04-23 20:26:32 UTC (rev 85665)
@@ -319,9 +319,9 @@
             return default
         uuid = None
         for pop, ix, job in self._iter():
-            res = None
             if job.begin_after > now:
                 break
+            res = None
             quotas = []
             if (job.begin_after + job.begin_by) < now:
                 res = zc.async.interfaces.IJob(

Modified: zc.async/trunk/src/zc/async/subscribers.py
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.py	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/subscribers.py	2008-04-23 20:26:32 UTC (rev 85665)
@@ -81,7 +81,12 @@
         def start():
             dispatcher.activate()
             reactor.run(installSignalHandlers=0)
-        thread = threading.Thread(target=start)
+        # we stash the thread object on the dispatcher so functional tests
+        # can do the following at the end of a test:
+        # dispatcher = zc.async.dispatcher.get()
+        # dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+        # dispatcher.thread.join(3)
+        dispatcher.thread = thread = threading.Thread(target=start)
         thread.setDaemon(True)
         thread.start()
     

Modified: zc.async/trunk/src/zc/async/subscribers.txt
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.txt	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/subscribers.txt	2008-04-23 20:26:32 UTC (rev 85665)
@@ -35,6 +35,13 @@
     >>> get_poll(0)
     {}
 
+The function also stashed the thread on the dispatcher, so we can write tests
+that can grab it easily.
+
+    >>> import threading
+    >>> isinstance(dispatcher.thread, threading.Thread)
+    True
+
 The function also installs some signal handlers to optimize shutdown.  We'll
 look at them soon.  For now, let's install some queues.
 

Modified: zc.async/trunk/src/zc/async/z3tests.py
===================================================================
--- zc.async/trunk/src/zc/async/z3tests.py	2008-04-23 19:22:55 UTC (rev 85664)
+++ zc.async/trunk/src/zc/async/z3tests.py	2008-04-23 20:26:32 UTC (rev 85665)
@@ -17,6 +17,11 @@
     f.close()
     zc.async.instanceuuid.UUID = zc.async.instanceuuid.getUUID()
 
+def tearDown(test):
+    import zc.async.dispatcher
+    zc.async.dispatcher.pop()
+    zope.component.testing.tearDown(test)
+
 def test_suite():
     return unittest.TestSuite((
         doctest.DocFileSuite(
@@ -25,8 +30,9 @@
             optionflags=doctest.INTERPRET_FOOTNOTES),
         doctest.DocFileSuite(
             'README_3.txt',
+            'README_3b.txt',
             setUp=zope.component.testing.setUp,
-            tearDown=zope.component.testing.tearDown,
+            tearDown=tearDown,
             optionflags=doctest.INTERPRET_FOOTNOTES),
         ))
 



More information about the Checkins mailing list