[Checkins] SVN: zc.async/trunk/s 3.4.0b1

Gary Poster gary at zope.com
Tue Jul 29 16:13:31 EDT 2008


Log message for revision 88979:
  3.4.0b1
  
  - Add warning about long commits to tips and tricks.
  
  - After complaining about a polling dispatcher that is deactivated not really
    being dead in the logs, reactivate.  NEEDS TEST
  
  - No longer use intermediate job to implement the success/failure addCallbacks
    behavior.  Introduce an ICallbackProxy that can be used for this kind of
    behavior instead.  This change was driven by two desires.
  
    - Don't log the intermediate result.  It makes logs harder to read with
      unnecessary duplications of pertinent data hidden within unimportant
      differences in the log entries.
  
    - Don't unnecessarily remember errors in success/failure callbacks.  This can
      cause unnecessary failures in unusual situations.
  
    The callback proxy accepts callbacks, which are added to the selected job
    (success or failure) when the job is selected.
  
    This change introduces some hopefully trivial incompatibilities, which
    basically come down to the callback being a proxy, not a real job. Use the
    convenience properties ``success`` and ``failure`` on the proxy to look at
    the respective jobs. After the proxy is evaluated, the ``job`` attribute
    will hold the job that was actually run. ``status`` and ``result`` are
    conveniences to get the status and result of the selected job.
  
  - Add ``parallel`` and ``serial`` convenience functions to zc.async.job to make
    it trivial to schedule and process decomposed jobs.
  
  - Add ``start`` convenience function to zc.async.configure to make it trivial
    to start up a common-case configuration of a zc.async dispatcher.  NEEDS TEST
  
  - No longer use protected attributes of callbacks in ``resumeCallbacks``.
  
  - The "local" code is now moved out from the dispatcher module to
    threadlocal.  This is to recognize that the local code is now modified
    outside of the dispatcher module, as described in the next bullet.
  
  - Jobs, when called, are responsible for setting the "local" job value.  This
    means that zc.async.local.getJob() always returns the currently running job,
    whether it is a top-level job (as before) or a callback (now).
  
  
  

Changed:
  U   zc.async/trunk/setup.py
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/README.txt
  U   zc.async/trunk/src/zc/async/README_2.txt
  U   zc.async/trunk/src/zc/async/__init__.py
  U   zc.async/trunk/src/zc/async/configure.py
  U   zc.async/trunk/src/zc/async/dispatcher.py
  U   zc.async/trunk/src/zc/async/ftesting.py
  U   zc.async/trunk/src/zc/async/interfaces.py
  U   zc.async/trunk/src/zc/async/job.py
  U   zc.async/trunk/src/zc/async/job.txt
  U   zc.async/trunk/src/zc/async/queue.py
  U   zc.async/trunk/src/zc/async/testing.py
  A   zc.async/trunk/src/zc/async/threadlocal.py
  U   zc.async/trunk/src/zc/async/tips.txt

-=-
Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/setup.py	2008-07-29 20:13:31 UTC (rev 88979)
@@ -71,7 +71,7 @@
 
 setup(
     name='zc.async',
-    version='1.4.0a1',
+    version='1.4.0b1',
     packages=find_packages('src'),
     package_dir={'':'src'},
     zip_safe=False,

Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-07-29 20:13:31 UTC (rev 88979)
@@ -5,7 +5,9 @@
   3.4.2 or newer.  Also added a summary section at the beginning of that file.
 
 - Added logging of critical messages to __stdout__ for ``ftesting.setUp``.
-  This can help discovering problems in callback transactions.
+  This can help discovering problems in callback transactions.  This uses a new
+  helper function , ``print_logs``, in zc.async.testing, which is primarily
+  intended to be used for quick and dirty debugging
 
 - Changed testing.wait_for_result and testing.wait_for_annotation to ignore
   ReadConflictErrors, so they can be used more reliably in tests that use
@@ -13,6 +15,48 @@
 
 - Support <type 'builtin_function_or_method'> for adaptation to Job.
 
+- Add warning about long commits to tips and tricks.
+
+- After complaining about a polling dispatcher that is deactivated not really
+  being dead in the logs, reactivate.  NEEDS TEST
+
+- No longer use intermediate job to implement the success/failure addCallbacks
+  behavior.  Introduce an ICallbackProxy that can be used for this kind of
+  behavior instead.  This change was driven by two desires.
+
+  - Don't log the intermediate result.  It makes logs harder to read with
+    unnecessary duplications of pertinent data hidden within unimportant
+    differences in the log entries.
+
+  - Don't unnecessarily remember errors in success/failure callbacks.  This can
+    cause unnecessary failures in unusual situations.
+
+  The callback proxy accepts callbacks, which are added to the selected job
+  (success or failure) when the job is selected.
+
+  This change introduces some hopefully trivial incompatibilities, which
+  basically come down to the callback being a proxy, not a real job. Use the
+  convenience properties ``success`` and ``failure`` on the proxy to look at
+  the respective jobs. After the proxy is evaluated, the ``job`` attribute
+  will hold the job that was actually run. ``status`` and ``result`` are
+  conveniences to get the status and result of the selected job.
+
+- Add ``parallel`` and ``serial`` convenience functions to zc.async.job to make
+  it trivial to schedule and process decomposed jobs.
+
+- Add ``start`` convenience function to zc.async.configure to make it trivial
+  to start up a common-case configuration of a zc.async dispatcher.  NEEDS TEST
+
+- No longer use protected attributes of callbacks in ``resumeCallbacks``.
+
+- The "local" code is now moved out from the dispatcher module to
+  threadlocal.  This is to recognize that the local code is now modified
+  outside of the dispatcher module, as described in the next bullet.
+
+- Jobs, when called, are responsible for setting the "local" job value.  This
+  means that zc.async.local.getJob() always returns the currently running job,
+  whether it is a top-level job (as before) or a callback (now).
+
 1.3 (2008-07-04)
 ================
 

Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/README.txt	2008-07-29 20:13:31 UTC (rev 88979)
@@ -742,19 +742,52 @@
     >>> transaction.commit()
     >>> reactor.wait_for(job, attempts=3)
     TIME OUT
+    >>> len(agent)
+    1
     >>> reactor.wait_for(job, attempts=3)
     >>> job.result
     42
 
+The job is now out of the agent.
+
+    >>> len(agent)
+    0
+
 The second_job could also have returned a job, allowing for additional
 legs.  Once the last job returns a real result, it will cascade through the
 past jobs back up to the original one.
 
 A different approach could have used callbacks.  Using callbacks can be
 somewhat more complicated to follow, but can allow for a cleaner
-separation of code: dividing code that does work from code that
-orchestrates the jobs.  We'll see an example of the idea below.
+separation of code: dividing code that does work from code that orchestrates
+the jobs. The ``serial`` helper function in the job module uses this pattern.
+Here's a quick example of the helper function [#define_longer_wait]_.
 
+    >>> def job_zero():
+    ...     return 0
+    ...
+    >>> def job_one():
+    ...     return 1
+    ...
+    >>> def job_two():
+    ...     return 2
+    ...
+    >>> def postprocess(zero, one, two):
+    ...     return zero.result, one.result, two.result
+    ...
+    >>> job = queue.put(zc.async.job.serial(job_zero, job_one, job_two,
+    ...                                     postprocess=postprocess))
+    >>> transaction.commit()
+
+    >>> wait_repeatedly()
+    ... # doctest: +ELLIPSIS
+    TIME OUT...
+
+    >>> job.result
+    (0, 1, 2)
+
+The ``parallel`` example we use below follows a similar pattern.
+
 Parallelized Work
 -----------------
 
@@ -767,7 +800,7 @@
 
 First, we'll define the jobs that do work.  ``job_A``, ``job_B``, and
 ``job_C`` will be jobs that can be done in parallel, and
-``post_process`` will be a function that assembles the job results for a
+``postprocess`` will be a function that assembles the job results for a
 final result.
 
     >>> def job_A():
@@ -782,25 +815,53 @@
     ...     # imaginary work...
     ...     return 21
     ...
-    >>> def post_process(*args):
+    >>> def postprocess(*jobs):
     ...     # this callable represents one that needs to wait for the
     ...     # parallel jobs to be done before it can process them and return
     ...     # the final result
-    ...     return sum(args)
+    ...     return sum(job.result for job in jobs)
     ...
 
-Now this code works with jobs to get everything done.  Note, in the
-callback function, that mutating the same object we are checking
-(job.args) is the way we are enforcing necessary serializability
-with MVCC turned on.
+This can be handled by a convenience function, ``parallel``, that will arrange
+everything for you.
 
+    >>> job = queue.put(zc.async.job.parallel(
+    ...     job_A, job_B, job_C, postprocess=postprocess))
+    >>> transaction.commit()
+
+Now we just wait for the result.
+
+    >>> wait_repeatedly()
+    ... # doctest: +ELLIPSIS
+    TIME OUT...
+
+    >>> job.result
+    42
+
+Ta-da!
+
+Now, how did this work?  Let's look at a simple implementation directly.  We'll
+use a slightly different postprocess, that expects results directly rather than
+the jobs.
+
+    >>> def postprocess(*results):
+    ...     # this callable represents one that needs to wait for the
+    ...     # parallel jobs to be done before it can process them and return
+    ...     # the final result
+    ...     return sum(results)
+    ...
+
+This code works with jobs to get everything done. Note, in the callback
+function, that mutating the same object we are checking (job.args) is the way
+we are enforcing necessary serializability with MVCC turned on.
+
     >>> def callback(job, result):
     ...     job.args.append(result)
     ...     if len(job.args) == 3: # all results are in
     ...         zc.async.local.getJob().queue.put(job)
     ...
     >>> def main_job():
-    ...     job = zc.async.job.Job(post_process)
+    ...     job = zc.async.job.Job(postprocess)
     ...     queue = zc.async.local.getJob().queue
     ...     for j in (job_A, job_B, job_C):
     ...         queue.put(j).addCallback(
@@ -816,21 +877,20 @@
 
     >>> job = queue.put(main_job)
     >>> transaction.commit()
-    >>> for i in range(10):
-    ...     reactor.wait_for(job, attempts=3)
-    ...     if job.status == zc.async.interfaces.COMPLETED:
-    ...         break
-    ... else:
-    ...     assert False, 'never completed'
+
+    >>> wait_repeatedly()
     ... # doctest: +ELLIPSIS
     TIME OUT...
     >>> job.result
     42
 
-Ta-da!
+Once again, ta-da!
 
 For real-world usage, you'd also probably want to deal with the possibility of
-one or more of the jobs generating a Failure, among other edge cases.
+one or more of the jobs generating a Failure, among other edge cases.  The
+``parallel`` function introduced above helps you handle this by returning
+jobs, rather than results, so you can analyze what went wrong and try to handle
+it.
 
 -------------------
 Returning Deferreds
@@ -1223,6 +1283,20 @@
     serial because of a quota, no other worker should be trying to work on
     those jobs.
 
+    Alternatively, you could use a standalone, non-zc.async queue of things to
+    do, and have the zc.async job just pull from that queue.  You might use
+    zc.queue for this stand-alone queue, or zc.catalogqueue.
+
+.. [#define_longer_wait]
+    >>> def wait_repeatedly():
+    ...     for i in range(10):
+    ...         reactor.wait_for(job, attempts=3)
+    ...         if job.status == zc.async.interfaces.COMPLETED:
+    ...             break
+    ...     else:
+    ...         assert False, 'never completed'
+    ...
+
 .. [#stop_usage_reactor]
 
     >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
@@ -1233,9 +1307,9 @@
      'shortest active': None,
      'shortest failed': (..., 'unnamed'),
      'shortest successful': (..., 'unnamed'),
-     'started': 24,
+     'started': 34,
      'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
      'statistics start': datetime.datetime(2006, 8, 10, 15, ...),
-     'successful': 22,
+     'successful': 32,
      'unknown': 0}
     >>> reactor.stop()

Modified: zc.async/trunk/src/zc/async/README_2.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_2.txt	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/README_2.txt	2008-07-29 20:13:31 UTC (rev 88979)
@@ -383,6 +383,7 @@
 
 Now we are ready to instantiate our dispatcher.
 
+    >>> import zc.async.dispatcher
     >>> dispatcher = zc.async.dispatcher.Dispatcher(db, reactor)
 
 Notice it has the uuid defined in instanceuuid.

Modified: zc.async/trunk/src/zc/async/__init__.py
===================================================================
--- zc.async/trunk/src/zc/async/__init__.py	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/__init__.py	2008-07-29 20:13:31 UTC (rev 88979)
@@ -11,4 +11,4 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
-from zc.async.dispatcher import local
+from zc.async.threadlocal import local

Modified: zc.async/trunk/src/zc/async/configure.py
===================================================================
--- zc.async/trunk/src/zc/async/configure.py	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/configure.py	2008-07-29 20:13:31 UTC (rev 88979)
@@ -15,11 +15,17 @@
 
 import zc.twist
 import zope.component
+import zope.event
+import zope.component.event # yuck; as of this writing, this import causes the
+                            # zope.component hook to be installed in
+                            # zope.event.
 import ZODB.interfaces
 
 import zc.async.interfaces
 import zc.async.job
+import zc.async.queue
 import zc.async.instanceuuid
+import zc.async.subscribers
 
 # These functions accomplish what configure.zcml does; you don't want both
 # to be in play (the component registry will complain).
@@ -61,3 +67,19 @@
     # see comment in ``minimal``, above
     minimal()
     zope.component.provideAdapter(zc.twist.connection)
+
+# this function installs a queue named '' (empty string), starts the
+# dispatcher, and installs an agent named 'main', with default values.
+# It is a convenience for quick starts.
+def start(db, poll_interval=5, db_name=None, chooser=None, size=3):
+    zope.component.provideAdapter(zc.async.queue.getDefaultQueue)
+    zope.component.provideHandler(
+        zc.async.subscribers.QueueInstaller(db_name=db_name))
+    zope.component.provideHandler(
+        zc.async.subscribers.ThreadedDispatcherInstaller(
+            poll_interval=poll_interval))
+    zope.component.provideHandler(
+        zc.async.subscribers.AgentInstaller('main',
+                                            chooser=agent_chooser,
+                                            size=agent_size))
+    zope.event.notify(zc.async.interfaces.DatabaseOpened(db))

Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/dispatcher.py	2008-07-29 20:13:31 UTC (rev 88979)
@@ -30,85 +30,11 @@
 import zope.bforest.periodic
 import zc.twist
 
+import zc.async
 import zc.async.utils
 import zc.async.interfaces
 
-def _get(reactor, job, name, default, timeout, poll, deferred, start=None):
-    now = time.time()
-    if start is None:
-        start = now
-    if name in job.annotations:
-        res = job.annotations[name]
-    elif start + timeout < now:
-        res = default
-    else:
-        partial = zc.twist.Partial(
-            _get, reactor, job, name, default, timeout, poll, deferred,
-            start)
-        partial.setReactor(reactor)
-        reactor.callLater(min(poll, start + timeout - now), partial)
-        return
-    deferred.setResult(res)
 
-class Result(object):
-
-    result = None
-
-    def __init__(self):
-        self._event = threading.Event()
-
-    def setResult(self, value):
-        self.result = value
-        self._event.set()
-
-    def wait(self, *args):
-        self._event.wait(*args)
-
-class Local(threading.local):
-
-    job = None
-    dispatcher = None
-
-    def getJob(self):
-        return self.job
-
-    def getQueue(self):
-        return self.job.queue
-
-    def getDispatcher(self):
-        return self.dispatcher
-
-    def getReactor(self):
-        return self.dispatcher.reactor
-
-    def setLiveAnnotation(self, name, value, job=None):
-        if self.job is None or self.dispatcher.reactor is None:
-            raise ValueError('not initialized')
-        if job is None:
-            job = self.job
-        partial = zc.twist.Partial(
-            job.annotations.__setitem__, name, value)
-        partial.setReactor(self.dispatcher.reactor)
-        self.dispatcher.reactor.callFromThread(partial)
-
-    def getLiveAnnotation(self, name, default=None, timeout=0,
-                          poll=1, job=None):
-        if self.job is None or self.dispatcher.reactor is None:
-            raise ValueError('not initialized')
-        if job is None:
-            job = self.job
-        deferred = Result()
-        partial = zc.twist.Partial(
-            _get, self.dispatcher.reactor, job, name, default, timeout, poll,
-            deferred)
-        partial.setReactor(self.dispatcher.reactor)
-        self.dispatcher.reactor.callFromThread(partial)
-        deferred.wait(timeout+2)
-        return deferred.result
-
-local = Local()
-
-
 class PollInfo(dict):
     key = None
     @property
@@ -135,7 +61,7 @@
         return self._size
 
     def perform_thread(self):
-        local.dispatcher = self.dispatcher
+        zc.async.local.dispatcher = self.dispatcher
         conn = self.dispatcher.db.open()
         try:
             job_info = self.queue.get()
@@ -160,7 +86,7 @@
                             # this setstate should trigger any initial problems
                             # within the try/except retry structure here.
                             local_conn.setstate(job)
-                            local.job = job
+                            # this is handled in job.__call__: local.job = job
                         except ZEO.Exceptions.ClientDisconnected:
                             zc.async.utils.log.info(
                                 'ZEO client disconnected while trying to '
@@ -239,7 +165,7 @@
                             info['result'][:10000] + '\n[...TRUNCATED...]')
                     info['completed'] = datetime.datetime.utcnow()
                 finally:
-                    local.job = None
+                    zc.async.local.job = None # also in job (here for paranoia)
                     transaction.abort()
                 zc.async.utils.tracelog.info(
                     'completed in thread %d: %s',

Modified: zc.async/trunk/src/zc/async/ftesting.py
===================================================================
--- zc.async/trunk/src/zc/async/ftesting.py	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/ftesting.py	2008-07-29 20:13:31 UTC (rev 88979)
@@ -36,14 +36,11 @@
         # zc.async calls.  Of course, if your test
         # intentionally generates CRITICAL log messages, you may not want this;
         # pass ``log_file=None`` to setUp.
-        logger = logging.getLogger('zc.async')
         # stashing this on the dispatcher is a hack, but at least we're doing
         # it on code from the same package.
-        dispatcher._debug_handler = logging.StreamHandler(log_file)
-        logger.setLevel(log_level)
-        logger.addHandler(dispatcher._debug_handler)
+        dispatcher._debug_handler = zc.async.testing.print_logs(
+            log_file, log_level)
 
-
 def tearDown():
     dispatcher = zc.async.dispatcher.get()
     if getattr(dispatcher, '_debug_handler', None) is not None:

Modified: zc.async/trunk/src/zc/async/interfaces.py
===================================================================
--- zc.async/trunk/src/zc/async/interfaces.py	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/interfaces.py	2008-07-29 20:13:31 UTC (rev 88979)
@@ -173,26 +173,11 @@
     This is almost certainly a programmer error."""
 
 
-class IJob(zope.interface.Interface):
+class IAbstractJob(zope.interface.Interface):
 
     parent = zope.interface.Attribute(
         """The current canonical location of the job""")
 
-    callable = zope.interface.Attribute(
-        """The callable object that should be called with *IJob.args and
-        **IJob.kwargs when the IJob is called.  Mutable.""")
-
-    args = zope.interface.Attribute(
-        """a peristent list of the args that should be applied to self.call.
-        May include persistent objects (though note that, if passing a method
-        is desired, it will typicall need to be wrapped in an IJob).""")
-
-    kwargs = zope.interface.Attribute(
-        """a persistent mapping of the kwargs that should be applied to
-        self.call.  May include persistent objects (though note that, if
-        passing a method is desired, it will typicall need to be wrapped
-        in an IJob).""")
-
     status = zope.interface.Attribute(
         """One of constants defined in zc.async.interfaces:
         NEW, PENDING, ASSIGNED, ACTIVE, CALLBACKS, COMPLETED.
@@ -209,13 +194,6 @@
         be None.  When COMPLETED, will be a twisted.python.failure.Failure
         describing the call failure or the successful result.""")
 
-    callbacks = zope.interface.Attribute(
-        """A mutable persistent list of the callback jobs added by
-        addCallbacks.""")
-
-    annotations = zope.interface.Attribute(
-        """An OOBTree that is available for metadata use.""")
-
     def addCallbacks(success=None, failure=None):
         """if success or failure is not None, adds a callback job to
         self.callbacks and returns the job.  Otherwise returns self.
@@ -231,6 +209,41 @@
         with the result of this job.  If callback is already in
         COMPLETED state then the callback will be performed immediately."""
 
+    callbacks = zope.interface.Attribute(
+        """A mutable persistent list of the callback jobs added by
+        addCallbacks.""")
+
+
+class ICallbackProxy(IAbstractJob):
+    """A proxy for jobs."""
+
+    job = zope.interface.Attribute(
+        """None, before ``getJob``, then the job calculated by ``getJob``""")
+
+    def getJob(result):
+        """Get the job for the given result."""
+
+
+class IJob(IAbstractJob):
+
+    callable = zope.interface.Attribute(
+        """The callable object that should be called with *IJob.args and
+        **IJob.kwargs when the IJob is called.  Mutable.""")
+
+    args = zope.interface.Attribute(
+        """a peristent list of the args that should be applied to self.call.
+        May include persistent objects (though note that, if passing a method
+        is desired, it will typicall need to be wrapped in an IJob).""")
+
+    kwargs = zope.interface.Attribute(
+        """a persistent mapping of the kwargs that should be applied to
+        self.call.  May include persistent objects (though note that, if
+        passing a method is desired, it will typicall need to be wrapped
+        in an IJob).""")
+
+    annotations = zope.interface.Attribute(
+        """An OOBTree that is available for metadata use.""")
+
     def __call__(*args, **kwargs):
         """call the callable.  Any given args are effectively appended to
         self.args for the call, and any kwargs effectively update self.kwargs

Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/job.py	2008-07-29 20:13:31 UTC (rev 88979)
@@ -33,6 +33,7 @@
 
 import zc.async.interfaces
 import zc.async.utils
+import zc.async
 
 def _repr(obj):
     if isinstance(obj, persistent.Persistent):
@@ -55,6 +56,8 @@
     else:
         return repr(obj)
 
+# this is kept so that legacy databases can keep their references to this
+# function
 def success_or_failure(success, failure, res):
     callable = None
     if isinstance(res, twisted.python.failure.Failure):
@@ -197,7 +200,120 @@
         res = RetryCommonForever(job)
     return res
 
+def isFailure(value):
+    return isinstance(value, twisted.python.failure.Failure)
 
+def _prepare_callback(callback, failure_log_level=None,
+                      retry_policy_factory=None, parent=None):
+    if not zc.async.interfaces.ICallbackProxy.providedBy(callback):
+        callback = zc.async.interfaces.IJob(callback)
+        if failure_log_level is not None:
+            callback.failure_log_level = failure_log_level
+        elif callback.failure_log_level is None:
+            callback.failure_log_level = logging.CRITICAL
+        if retry_policy_factory is not None:
+            callback.retry_policy_factory = retry_policy_factory
+        elif callback.retry_policy_factory is None:
+            callback.retry_policy_factory = callback_retry_policy_factory
+    callback.parent = parent
+    return callback
+
+class ConditionalCallbackProxy(zc.async.utils.Base):
+
+    zope.interface.implements(zc.async.interfaces.ICallbackProxy)
+
+    job = None
+
+    @property
+    def status(self):
+        # NEW -> (PENDING -> ASSIGNED ->) ACTIVE -> CALLBACKS -> COMPLETED
+        if self.job is None:
+            ob = self.parent
+            while (ob is not None and
+                   zc.async.interfaces.IJob.providedBy(ob)):
+                ob = ob.parent
+            if zc.async.interfaces.IAgent.providedBy(ob):
+                return zc.async.interfaces.ASSIGNED
+            elif zc.async.interfaces.IQueue.providedBy(ob):
+                return zc.async.interfaces.PENDING
+            return zc.async.interfaces.NEW
+        return self.job.status
+
+    @property
+    def result(self):
+        if self.job is None:
+            return None
+        return self.job.result
+
+    def __init__(self, *args, **kwargs):
+        kwargs['parent'] = self
+        default = None
+        if not args:
+            pass
+        elif args[-1] is None:
+            args = args[:-1]
+        elif getattr(args[-1], '__len__', None) is None:
+            default = _prepare_callback(args[-1], **kwargs)
+            args = args[:-1]
+        self.default = default
+        self.conditionals = persistent.list.PersistentList()
+        for condition, job in args:
+            if job is not None:
+                job = _prepare_callback(job, **kwargs)
+            self.conditionals.append((condition, job))
+        self.callbacks = zc.queue.PersistentQueue()
+
+    def getJob(self, result):
+        if self.job is None:
+            for condition, callable in self.conditionals:
+                if condition(result):
+                    break
+            else:
+                callable = self.default
+            if callable is None:
+                callable = _prepare_callback(_transparent, None, None, self)
+            self.job = callable
+        else:
+            callable = self.job
+        while self.callbacks:
+            callable.addCallback(self.callbacks.pull())
+        return callable
+
+    def addCallbacks(self, success=None, failure=None,
+                     failure_log_level=None, retry_policy_factory=None):
+        return self.addCallback(SuccessFailureCallbackProxy(
+            success, failure,
+            failure_log_level=failure_log_level,
+            retry_policy_factory=retry_policy_factory))
+
+    def addCallback(self, callback, failure_log_level=None,
+                    retry_policy_factory=None):
+        callback = _prepare_callback(
+            callback, failure_log_level, retry_policy_factory, self)
+        if self.job is None:
+            self.callbacks.put(callback)
+        else:
+            self.job.addCallback(callback)
+        return callback
+
+
+class SuccessFailureCallbackProxy(ConditionalCallbackProxy):
+
+    @property
+    def success(self):
+        return self.default
+
+    @property
+    def failure(self):
+        return self.conditionals[0][1]
+
+    def __init__(self, success, failure, failure_log_level=None,
+                 retry_policy_factory=None):
+        super(SuccessFailureCallbackProxy, self).__init__(
+            (isFailure, failure), success,
+            failure_log_level=failure_log_level,
+            retry_policy_factory=retry_policy_factory)
+
 class Job(zc.async.utils.Base):
 
     zope.interface.implements(zc.async.interfaces.IJob)
@@ -381,62 +497,34 @@
             self._callable_name = value.__name__
         else:
             self._callable_root, self._callable_name = value, None
-        if zc.async.interfaces.IJob.providedBy(self._callable_root):
+        if (zc.async.interfaces.IJob.providedBy(self._callable_root) and
+            self._callable_root.parent is None):
+            # if the parent is already set, that is probably an agent or
+            # something like that.  Don't override, or else the agent won't
+            # get cleaned out.
             self._callable_root.parent = self
 
     def addCallbacks(self, success=None, failure=None,
                      failure_log_level=None, retry_policy_factory=None):
-        if success is not None or failure is not None:
-            if success is not None:
-                success = zc.async.interfaces.IJob(success)
-                if failure_log_level is not None:
-                    success.failure_log_level = failure_log_level
-                elif success.failure_log_level is None:
-                    success.failure_log_level = logging.CRITICAL
-                if retry_policy_factory is not None:
-                    success.retry_policy_factory = retry_policy_factory
-                elif success.retry_policy_factory is None:
-                    success.retry_policy_factory = (
-                        callback_retry_policy_factory)
-            if failure is not None:
-                failure = zc.async.interfaces.IJob(failure)
-                if failure_log_level is not None:
-                    failure.failure_log_level = failure_log_level
-                elif failure.failure_log_level is None:
-                    failure.failure_log_level = logging.CRITICAL
-                if retry_policy_factory is not None:
-                    failure.retry_policy_factory = retry_policy_factory
-                elif failure.retry_policy_factory is None:
-                    failure.retry_policy_factory = (
-                        callback_retry_policy_factory)
-            res = Job(success_or_failure, success, failure)
-            if success is not None:
-                success.parent = res
-            if failure is not None:
-                failure.parent = res
-            self.addCallback(res)
-        else:
-            res = self
-        return res
+        return self.addCallback(SuccessFailureCallbackProxy(
+            success, failure,
+            failure_log_level=failure_log_level,
+            retry_policy_factory=retry_policy_factory))
 
     def addCallback(self, callback, failure_log_level=None,
                     retry_policy_factory=None):
-        callback = zc.async.interfaces.IJob(callback)
+        callback = _prepare_callback(
+            callback, failure_log_level, retry_policy_factory, self)
         self.callbacks.put(callback)
-        callback.parent = self
         if self._status == zc.async.interfaces.COMPLETED:
-            callback(self.result) # this commits transactions!
+            if zc.async.interfaces.ICallbackProxy.providedBy(callback):
+                call = callback.getJob(self.result)
+            else:
+                call = callback
+            call(self.result) # this commits transactions!
         else:
             self._p_changed = True # to try and fire conflict errors if
             # our reading of self.status has changed beneath us
-        if failure_log_level is not None:
-            callback.failure_log_level = failure_log_level
-        elif callback.failure_log_level is None:
-            callback.failure_log_level = logging.CRITICAL
-        if retry_policy_factory is not None:
-            callback.retry_policy_factory = retry_policy_factory
-        elif callback.retry_policy_factory is None:
-            callback.retry_policy_factory = callback_retry_policy_factory
         return callback
 
     def getRetryPolicy(self):
@@ -503,6 +591,7 @@
         data_cache = {}
         res = None
         while 1:
+            zc.async.local.job = self # we do this in the loop for paranoia
             try:
                 setup_info = self.setUp()
                 res = self.callable(*effective_args, **effective_kwargs)
@@ -527,6 +616,7 @@
                     if self is zc.async.utils.never_fail(
                         lambda: self._reschedule(retry, data_cache),
                         identifier, tm):
+                        zc.async.local.job = None
                         return self
                 elif retry:
                     continue
@@ -554,6 +644,7 @@
                     if self is zc.async.utils.never_fail(
                         lambda: self._reschedule(retry, data_cache),
                         identifier, tm):
+                        zc.async.local.job = None
                         return self
                 elif retry:
                     continue
@@ -593,6 +684,7 @@
                 self._log_completion(res)
                 identifier = 'performing callbacks of %r' % (self,)
                 zc.async.utils.never_fail(self.resumeCallbacks, identifier, tm)
+            zc.async.local.job = None
             return res
 
     def handleInterrupt(self):
@@ -682,6 +774,7 @@
         callback = True
         if zc.async.interfaces.IJob.providedBy(res):
             res.addCallback(self._callback)
+            self._result = res # temporary
             callback = False
         elif isinstance(res, twisted.internet.defer.Deferred):
             partial = zc.twist.Partial(self._callback)
@@ -727,6 +820,77 @@
         if callback:
             self.resumeCallbacks()
 
+    def handleCallbackInterrupt(self, caller):
+        if self._status != zc.async.interfaces.ACTIVE:
+            raise zc.async.interfaces.BadStatusError(
+                'can only handleCallbackInterrupt on a job with ACTIVE status')
+        if caller.status != zc.async.interfaces.CALLBACKS:
+            raise zc.async.interfaces.BadStatusError(
+                'can only handleCallbackInterrupt with caller in CALLBACKS '
+                'status')
+        result = caller.result
+        if self.result is not None:
+            if not zc.async.interfaces.IJob.providedBy(self.result):
+                msg = ('Callback %r is in an apparently insane state: result '
+                       'has been set (%r), the result is not a job, and yet '
+                       'the status is ACTIVE.  This should not be possible.  ')
+                if self.result == result:
+                    zc.async.utils.log.error(
+                        msg + 'Stored result is equivalent to currently '
+                        'received result, so will '
+                        'change status to CALLBACKS and '
+                        'run callbacks, for no clear "right" action.',
+                        self, self.result)
+                    self._status = zc.async.interfaces.CALLBACKS
+                    self._active_end = datetime.datetime.now(pytz.UTC)
+                    self.resumeCallbacks()
+                    return
+                else:
+                    zc.async.utils.log.error(
+                        msg + 'Stored result is not equivalent to currently '
+                        'received result (%r), so will '
+                        '(re?)run this job with new result, for no clear '
+                        '"right" action.',
+                        self, self.result, result)
+                    # fall through
+            elif self.result.status == zc.async.interfaces.COMPLETED:
+                zc.async.utils.log.warning(
+                    'Callback %r is in an apparently insane state: inner job '
+                     'result has been completed, including callbacks, but '
+                     'this job has not been '
+                     'completed.  This should not be possible.  Will set '
+                     'result and run callbacks, for no clear "right" action.')
+                callback = self._set_result(self.result.result)
+                self._log_completion(self.result.result)
+                if callback:
+                    self.resumeCallbacks()
+                return
+            else:
+                return # we are going to hope that the job works; it should,
+                # and there's no way for us to know that it won't here.
+        tm = transaction.interfaces.ITransactionManager(self)
+        retry = self._getRetry('interrupted', tm)
+        istime = isinstance(
+            retry, (datetime.timedelta, datetime.datetime))
+        if istime:
+            zc.async.utils.log.error(
+                'error for IRetryPolicy %r on %r: '
+                'cannot reschedule a callback, only retry.  '
+                'We will retry now, for no clear "right" action.',
+                self.getRetryPolicy(), self)
+        if retry or istime:
+            zc.async.utils.tracelog.debug(
+                'retrying interrupted callback '
+                '%r to %r', self, caller)
+            self._status = zc.async.interfaces.NEW
+            self._active_start = None
+            self(result)
+        else:
+            zc.async.utils.tracelog.debug(
+                'aborting interrupted callback '
+                '%r to %r', self, caller)
+            self.fail(zc.async.interfaces.AbortedError())
+
     def resumeCallbacks(self):
         # should be called within a job that has a RetryCommonForever policy
         if self._status != zc.async.interfaces.CALLBACKS:
@@ -737,8 +901,12 @@
         length = 0
         while 1:
             for j in callbacks:
-                # TODO yuck: this mucks in callbacks' protected bits
-                if j._status == zc.async.interfaces.NEW:
+                if zc.async.interfaces.ICallbackProxy.providedBy(j):
+                    j = j.getJob(self.result)
+                status = j.status
+                if status in (zc.async.interfaces.NEW,
+                              zc.async.interfaces.ASSIGNED,
+                              zc.async.interfaces.PENDING):
                     if (j.begin_by is not None and
                         (j.begin_after + j.begin_by) <
                         datetime.datetime.now(pytz.UTC)):
@@ -749,28 +917,9 @@
                         zc.async.utils.tracelog.debug(
                             'starting callback %r to %r', j, self)
                         j(self.result)
-                elif j._status == zc.async.interfaces.ACTIVE:
-                    retry = j._getRetry('interrupted', tm)
-                    istime = isinstance(
-                        retry, (datetime.timedelta, datetime.datetime))
-                    if istime:
-                        zc.async.utils.log.error(
-                            'error for IRetryPolicy %r on %r: '
-                            'cannot reschedule a callback, only retry',
-                            j.getRetryPolicy(), j)
-                    if retry or istime:
-                        zc.async.utils.tracelog.debug(
-                            'retrying interrupted callback '
-                            '%r to %r', j, self)
-                        j._status = zc.async.interfaces.NEW
-                        j._active_start = None
-                        j(self.result)
-                    else:
-                        zc.async.utils.tracelog.debug(
-                            'aborting interrupted callback '
-                            '%r to %r', j, self)
-                        j.fail(zc.async.interfaces.AbortedError())
-                elif j._status == zc.async.interfaces.CALLBACKS:
+                elif status == zc.async.interfaces.ACTIVE:
+                    j.handleCallbackInterrupt(self)
+                elif status == zc.async.interfaces.CALLBACKS:
                     j.resumeCallbacks()
                 # TODO: this shouldn't raise anything we want to catch, right?
                 # now, this should catch all the errors except EXPLOSIVE_ERRORS
@@ -787,3 +936,58 @@
                     self.parent.jobCompleted(self)
                 tm.commit()
                 return
+
+# conveniences for serial and parallel jobs
+
+def _transparent(*results):
+    return results
+
+def _serial_or_parallel(scheduler, jobs, kw):
+    if kw and (len(kw) > 1 or kw.keys()[0] != 'postprocess'):
+        raise TypeError('only accepts one keyword argument, ``postprocess``')
+    postprocess = zc.async.interfaces.IJob(kw.get('postprocess', _transparent))
+    result = Job(scheduler,
+                 *(zc.async.interfaces.IJob(j) for j in jobs),
+                 **dict(postprocess=postprocess))
+    postprocess.args = result.args # ...I guess this means I bless this muck
+    return result
+
+def _queue_next(main_job, ix=0, ignored_result=None):
+    jobs = main_job.args
+    queue = main_job.queue
+    if ix < len(jobs):
+        next = jobs[ix]
+        queue.put(next)
+        next.addCallback(Job(_queue_next, main_job, ix+1))
+    else:
+        queue.put(main_job.kwargs['postprocess'])
+
+def _schedule_serial(*jobs, **kw):
+    _queue_next(zc.async.local.getJob())
+    return kw['postprocess']
+
+def serial(*jobs, **kw):
+    return _serial_or_parallel(_schedule_serial, jobs, kw)
+
+def _queue_all(main_job, ignored_result=None):
+    jobs = main_job.args
+    queue = main_job.queue
+    complete = True
+    for job in jobs:
+        status = job.status
+        if status == zc.async.interfaces.NEW:
+            queue.put(job)
+            job.addCallback(Job(_queue_all, main_job))
+            complete = False
+        elif status not in (zc.async.interfaces.COMPLETED,
+                            zc.async.interfaces.CALLBACKS):
+            complete = False
+    if complete:
+        queue.put(main_job.kwargs['postprocess'])
+
+def _schedule_parallel(*jobs, **kw):
+    _queue_all(zc.async.local.getJob())
+    return kw['postprocess']
+
+def parallel(*jobs, **kw):
+    return _serial_or_parallel(_schedule_parallel, jobs, kw)

Modified: zc.async/trunk/src/zc/async/job.txt
===================================================================
--- zc.async/trunk/src/zc/async/job.txt	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/job.txt	2008-07-29 20:13:31 UTC (rev 88979)
@@ -150,9 +150,11 @@
     >>> j.status == zc.async.interfaces.ACTIVE
     True
 
-When we call the inner job, the result will be placed on the outer job.
+While the status is ACTIVE, the result is the inner job. When we call the inner
+job, the result will be placed on the outer job.
 
-    >>> j.result # None
+    >>> j.result # doctest: +ELLIPSIS
+    <zc.async.job.Job (...) ``zc.async.doctest_test.innerCall()``>
     >>> res = ij()
     >>> j.result
     42
@@ -241,13 +243,23 @@
 =========
 
 The job object can also be used to handle return values and exceptions from the
-call. The ``addCallbacks`` method enables the functionality. Its signature is
-(success=None, failure=None). It may be called multiple times, each time adding
-a success and/or failure callable that takes the end result of the original,
-parent job: a value or a zc.async.Failure object, respectively. Failure objects
-are passed to failure callables, and any other results are passed to success
-callables.
+call. The ``addCallback`` and ``addCallbacks`` methods enables the
+functionality.
 
+The ``addCallback`` takes a callable or job as its primary
+argument, and returns a job on which you can add callbacks.
+
+The
+``addCallbacks`` signature begins with (success=None, failure=None), where
+success and failure may be a callable or a job. It returns a job proxy, on
+which you can add callbacks which will be applied to the chosen job (or a
+"no-op" job if the value is ``None``).
+
+The methods may be called multiple times. Each callable used will receive the
+end result of the original, parent job: a value or a zc.async.Failure object,
+respectively. If you use ``addCallbacks``, Failure objects are passed to
+failure callables, and any other results are passed to success callables.
+
 Note that, unlike with Twisted deferred's, the results of callbacks for a given
 job are not chained.  To chain, add a callback to the desired callback.  The
 primary reason for a Twisted callback is try:except:else:finally logic.  In
@@ -295,7 +307,7 @@
     ...     print "failure.", f
     ...
     >>> j.addCallbacks(success, failure) # doctest: +ELLIPSIS
-    <zc.async.job.Job ...>
+    <zc.async.job.SuccessFailureCallbackProxy ...>
     >>> res = j()
     success! 15
 
@@ -304,7 +316,7 @@
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, None)
     >>> transaction.commit()
     >>> j.addCallbacks(success, failure) # doctest: +ELLIPSIS
-    <zc.async.job.Job (oid ...>
+    <zc.async.job.SuccessFailureCallbackProxy ...>
     >>> res = j() # doctest: +ELLIPSIS
     failure. [Failure instance: Traceback: exceptions.TypeError...]
 
@@ -319,9 +331,9 @@
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 3)
     >>> transaction.commit()
     >>> j.addCallbacks(success) # doctest: +ELLIPSIS
-    <zc.async.job.Job (oid ...>
+    <zc.async.job.SuccessFailureCallbackProxy ...>
     >>> j.addCallbacks(also_success) # doctest: +ELLIPSIS
-    <zc.async.job.Job (oid ...>
+    <zc.async.job.SuccessFailureCallbackProxy ...>
     >>> res = j()
     success! 15
     also a success! 15
@@ -329,9 +341,9 @@
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, None)
     >>> transaction.commit()
     >>> j.addCallbacks(failure=failure) # doctest: +ELLIPSIS
-    <zc.async.job.Job (oid ...>
+    <zc.async.job.SuccessFailureCallbackProxy ...>
     >>> j.addCallbacks(failure=also_failure) # doctest: +ELLIPSIS
-    <zc.async.job.Job (oid ...>
+    <zc.async.job.SuccessFailureCallbackProxy ...>
     >>> res = j() # doctest: +ELLIPSIS
     failure. [Failure instance: Traceback: exceptions.TypeError...]
     also a failure. [Failure instance: Traceback: exceptions.TypeError...]
@@ -368,7 +380,7 @@
     >>> transaction.commit()
     >>> j.addCallbacks(zc.async.job.Job(multiply, 4)
     ...               ).addCallbacks(success) # doctest: +ELLIPSIS
-    <zc.async.job.Job (oid ...>
+    <zc.async.job.SuccessFailureCallbackProxy ...>
     >>> res = j()
     success! 60
 
@@ -381,7 +393,7 @@
     >>> transaction.commit()
     >>> j.addCallbacks(
     ...     failure=handle_failure).addCallbacks(success) # doctest: +ELLIPSIS
-    <zc.async.job.Job (oid ...>
+    <zc.async.job.SuccessFailureCallbackProxy ...>
     >>> res = j()
     success! 0
 
@@ -1567,13 +1579,16 @@
     >>> len(j.callbacks)
     1
 
-When you use ``addCallbacks``, the job you get back has a callable with
-the success and failure jobs you passed in as arguments.
+When you use ``addCallbacks``, the callback proxy you get back has ``success``
+and ``failure`` properties to show you the jobs (or ``None``) for the values
+you passed in as arguments.
 
     >>> j.callbacks[0] is j_callback
     True
-    >>> j_callback.args # doctest: +ELLIPSIS
-    [<zc.async.job.Job ... ``zc.async.doctest_test.multiply(5)``>, None]
+    >>> j_callback.success # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``...multiply(5)``>
+    >>> print j_callback.failure
+    None
 
 ``addCallback`` does not have this characteristic.
 
@@ -1774,8 +1789,7 @@
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     starting callback
      <...Job (oid ..., db 'unnamed')
-      ``...success_or_failure(...Job (oid ..., db 'unnamed'),
-                              ...Job (oid ..., db 'unnamed'))``>
+      ``...success()``>
     to
      <...Job (oid ..., db 'unnamed') ``...multiply(5, 3)``>
 
@@ -1790,7 +1804,7 @@
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     <zc.async.job.Job (oid ..., db 'unnamed')
-     ``zc.async.job.success_or_failure(...)``>
+     ``...success()``>
     succeeded with result:
     None
 
@@ -1806,7 +1820,7 @@
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     <zc.async.job.Job (oid ..., db 'unnamed')
-     ``zc.async.job.success_or_failure(...)``>
+     ``...failure()``>
     succeeded with result:
     None
 
@@ -1828,7 +1842,7 @@
     ...     assert False, 'could not find log'
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    <...Job...``zc.async.doctest_test.multiply()``> failed with traceback:
+    <...Job...``...multiply()``> failed with traceback:
     *--- Failure #... (pickled) ---
     ...job.py:...
      [ Locals...

Modified: zc.async/trunk/src/zc/async/queue.py
===================================================================
--- zc.async/trunk/src/zc/async/queue.py	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/queue.py	2008-07-29 20:13:31 UTC (rev 88979)
@@ -121,11 +121,16 @@
                             job.handleInterrupt,
                             retry_policy_factory=zc.async.job.RetryCommonForever,
                             failure_log_level=logging.CRITICAL)
+                        # we don't make job's parent j because it shouldn't
+                        # really be needed and it would be a pain to clean up
                     elif job.status == zc.async.interfaces.CALLBACKS:
                         j = queue.put(
                             job.resumeCallbacks,
                             retry_policy_factory=zc.async.job.RetryCommonForever,
                             failure_log_level=logging.CRITICAL)
+                        # make job's parent j so that ``queue`` references work
+                        # in callbacks
+                        job.parent = j
                     elif job.status == zc.async.interfaces.COMPLETED:
                         # huh, that's odd.
                         agent.completed.add(job)
@@ -175,7 +180,15 @@
     def ping(self, uuid):
         da = self[uuid]
         if not da.activated:
-            raise ValueError('UUID is not activated.')
+            zc.async.utils.log.critical(
+                "Dispatcher %r not activated prior to ping. This can indicate "
+                "that the dispatcher's ping_death_interval is set too short, "
+                "or that some transactions in the system are taking too long "
+                "to commit. Activating, to correct the current problem, but "
+                "if the dispatcher was inappropriately viewed as ``dead`` and "
+                "deactivated, you should investigate the cause.",
+                uuid)
+            da.activate()
         now = datetime.datetime.now(pytz.UTC)
         last_ping = da.last_ping.value
         if (last_ping is None or

Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/testing.py	2008-07-29 20:13:31 UTC (rev 88979)
@@ -18,6 +18,8 @@
 # that test monkeypatching of time.sleep does not affect the usage in this
 # module
 import datetime
+import logging
+import sys
 
 import pytz
 import transaction
@@ -212,7 +214,6 @@
     else:
         assert False, 'job never completed'
 
-
 def wait_for_annotation(job, name):
     for i in range(60):
         t = transaction.begin()
@@ -225,3 +226,13 @@
         time_sleep(0.1)
     else:
         assert False, 'annotation never found'
+
+def print_logs(log_file=sys.stdout, log_level=logging.CRITICAL):
+    # really more of a debugging tool
+    logger = logging.getLogger('zc.async')
+    # stashing this on the dispatcher is a hack, but at least we're doing
+    # it on code from the same package.
+    handler = logging.StreamHandler(log_file)
+    logger.setLevel(log_level)
+    logger.addHandler(handler)
+    return handler

Added: zc.async/trunk/src/zc/async/threadlocal.py
===================================================================
--- zc.async/trunk/src/zc/async/threadlocal.py	                        (rev 0)
+++ zc.async/trunk/src/zc/async/threadlocal.py	2008-07-29 20:13:31 UTC (rev 88979)
@@ -0,0 +1,96 @@
+##############################################################################
+#
+# Copyright (c) 2006-2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Thread-local values and features.
+
+``local``, below, is reexported as zc.async.local
+"""
+import time
+import threading
+
+import zc.twist
+
+def _get(reactor, job, name, default, timeout, poll, deferred, start=None):
+    now = time.time()
+    if start is None:
+        start = now
+    if name in job.annotations:
+        res = job.annotations[name]
+    elif start + timeout < now:
+        res = default
+    else:
+        partial = zc.twist.Partial(
+            _get, reactor, job, name, default, timeout, poll, deferred,
+            start)
+        partial.setReactor(reactor)
+        reactor.callLater(min(poll, start + timeout - now), partial)
+        return
+    deferred.setResult(res)
+
+class Result(object):
+
+    result = None
+
+    def __init__(self):
+        self._event = threading.Event()
+
+    def setResult(self, value):
+        self.result = value
+        self._event.set()
+
+    def wait(self, *args):
+        self._event.wait(*args)
+
+class Local(threading.local):
+
+    job = None
+    dispatcher = None
+
+    def getJob(self):
+        return self.job
+
+    def getQueue(self):
+        return self.job.queue
+
+    def getDispatcher(self):
+        return self.dispatcher
+
+    def getReactor(self):
+        return self.dispatcher.reactor
+
+    def setLiveAnnotation(self, name, value, job=None):
+        if self.job is None or self.dispatcher.reactor is None:
+            raise ValueError('not initialized')
+        if job is None:
+            job = self.job
+        partial = zc.twist.Partial(
+            job.annotations.__setitem__, name, value)
+        partial.setReactor(self.dispatcher.reactor)
+        self.dispatcher.reactor.callFromThread(partial)
+
+    def getLiveAnnotation(self, name, default=None, timeout=0,
+                          poll=1, job=None):
+        if self.job is None or self.dispatcher.reactor is None:
+            raise ValueError('not initialized')
+        if job is None:
+            job = self.job
+        deferred = Result()
+        partial = zc.twist.Partial(
+            _get, self.dispatcher.reactor, job, name, default, timeout, poll,
+            deferred)
+        partial.setReactor(self.dispatcher.reactor)
+        self.dispatcher.reactor.callFromThread(partial)
+        deferred.wait(timeout+2)
+        return deferred.result
+
+local = Local()

Modified: zc.async/trunk/src/zc/async/tips.txt
===================================================================
--- zc.async/trunk/src/zc/async/tips.txt	2008-07-29 19:39:52 UTC (rev 88978)
+++ zc.async/trunk/src/zc/async/tips.txt	2008-07-29 20:13:31 UTC (rev 88979)
@@ -10,6 +10,12 @@
   your code into a job for low-conflict tasks and one or more jobs for
   high-conflict tasks, perhaps created in a callback.
 
+* Sometimes you can't avoid long transactions. But *really* try to avoid long
+  commits. Commits hold a lock on the ZODB, and if you end up writing so much
+  in a single transaction that you take noticeable time to write, realize that
+  you are affecting--postponing--every single subsequent commit to the
+  database.
+
 * Callbacks should be quick and reliable. If you want to do something that
   might take a while, put another job in the queue.
 
@@ -17,6 +23,10 @@
   don't want them to be retried!  Use the NeverRetry retry policy for these,
   as described in the `Recovering from Catastrophes`_ section below.
 
+* zc.async works fine with both Python 2.4 and Python 2.5.  Note that building
+  Twisted with Python 2.4 generates a SyntaxError in a test, but as of this
+  writing Twisted 8.1.0 is supported for Python 2.4.
+
 Testing Tips and Tricks
 =======================
 



More information about the Checkins mailing list