[Checkins] SVN: zc.async/branches/dev/ things are a bit in disarray, but want to save work, so checking in on a branch. branch started from trunk rev 86847.

Gary Poster gary at zope.com
Mon May 19 21:08:43 EDT 2008


Log message for revision 86848:
  things are a bit in disarray, but want to save work, so checking in on a branch.  branch started from trunk rev 86847.

Changed:
  U   zc.async/branches/dev/buildout.cfg
  U   zc.async/branches/dev/src/zc/async/TODO.txt
  U   zc.async/branches/dev/src/zc/async/catastrophes.txt
  U   zc.async/branches/dev/src/zc/async/dispatcher.py
  U   zc.async/branches/dev/src/zc/async/job.py
  U   zc.async/branches/dev/src/zc/async/queue.py
  U   zc.async/branches/dev/src/zc/async/utils.py

-=-
Modified: zc.async/branches/dev/buildout.cfg
===================================================================
--- zc.async/branches/dev/buildout.cfg	2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/buildout.cfg	2008-05-20 01:08:43 UTC (rev 86848)
@@ -13,13 +13,13 @@
 [test]
 recipe = zc.recipe.testrunner
 eggs = zc.async
-defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1'.split()
+defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1 --auto-color'.split()
 working-directory = ${buildout:directory}
 
 [z3test]
 recipe = zc.recipe.testrunner
 eggs = zc.async [z3]
-defaults = "--tests-pattern z3tests --exit-with-status -1".split()
+defaults = "--tests-pattern z3tests --exit-with-status -1 --auto-color".split()
 
 
 [interpreter]

Modified: zc.async/branches/dev/src/zc/async/TODO.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/TODO.txt	2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/TODO.txt	2008-05-20 01:08:43 UTC (rev 86848)
@@ -1,11 +1,60 @@
 Bugs and improvements:
 
-- need retry tasks, particularly for callbacks
+- need retry tasks, particularly for callbacks. ``retry_count`` affects aborts
+  and transaction failures?  None == infinity, which is what callbacks use?
+  Should retry have a cleanup function?
 
 - need CRITICAL logs for callbacks
 
-- when database went away, and then came back, async didn't come back.
+- when database went away, and then came back, async didn't come back.  Fix
+  (and also reconsider retry behavior in Dispatcher._commit and
+  AgentThreadPool.perform_thread).
 
+Traceback (most recent call last):
+  File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/dispatcher.py", line 321, in _commit
+    transaction.commit()
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_manager.py", line 93, in commit
+    return self.get().commit()
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 325, in commit
+    self._commitResources()
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 422, in _commitResources
+    rm.tpc_begin(self)
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/Connection.py", line 525, in tpc_begin
+    self._normal_storage.tpc_begin(transaction)
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 1079, in tpc_begin
+    self._server.tpc_begin(id(txn), txn.user, txn.description,
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 86, in __getattr__
+    raise ClientDisconnected()
+ClientDisconnected
+
+Exception in thread Thread-5:
+Traceback (most recent call last):
+  File "/opt/cleanpython24/lib/python2.4/threading.py", line 442, in __bootstrap
+    self.run()
+  File "/opt/cleanpython24/lib/python2.4/threading.py", line 422, in run
+    self.__target(*self.__args, **self.__kwargs)
+  File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/dispatcher.py", line 153, in perform_thread
+    job() # this does the committing and retrying, largely
+  File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 290, in __call__
+    return self._call_with_retry(
+  File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 321, in _call_with_retry
+    res = self._complete(zc.twist.Failure(), tm)
+  File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 340, in _complete
+    tm.commit()
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_manager.py", line 93, in commit
+    return self.get().commit()
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 325, in commit
+    self._commitResources()
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 422, in _commitResources
+    rm.tpc_begin(self)
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/Connection.py", line 525, in tpc_begin
+    self._normal_storage.tpc_begin(transaction)
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 1079, in tpc_begin
+    self._server.tpc_begin(id(txn), txn.user, txn.description,
+  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 86, in __getattr__
+    raise ClientDisconnected()
+ClientDisconnected
+
 - be even more pessimistic about memory for saved polls and job info in
   dispatcher.
 

Modified: zc.async/branches/dev/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/catastrophes.txt	2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/catastrophes.txt	2008-05-20 01:08:43 UTC (rev 86848)
@@ -1,417 +1,537 @@
-Catastrophes
-============
+Recovering from Catastrophes
+============================
 
-Sometimes bad things happen in the course of processing tasks.  You might have
-a MemoryError while processing your main job, or some other failure might
-happen.  That's bad enough.  Of course, you can register some callbacks to
-handle the error, to do what you need to recover.
+------------
+Introduction
+------------
 
-But then what if the callback itself fails? Perhaps the situation that caused
-the main job to fail with a MemoryError will let the callback start, but not
-complete.  Then when a sibling dispatcher handles the incomplete job, the
-callback will fail.
+Sometimes bad things happen in the course of processing tasks. What might go
+wrong? How does zc.async handle these errors? What are your responsibilities?
 
-You, the user, do have some responsibilities. Callbacks should be very fast and
-light. If you want to do something that takes a long time, or might take a long
-time, have your callback put a new job in a queue for the long job. The
-callback itself should then complete, quickly out of the way. 
+First, what might go wrong?
 
-However, zc.async also has important responsibilities.
+- zc.async could have a problem while polling for jobs.  We'll call this a
+  "polling exception."
 
-This document examines catastrophes like the ones outlined above, to show how
-zc.async handles them, and how you can configure zc.async for these situations.
-Other documents in zc.async, such as the "Dead Dispatchers" section of
-queue.txt, look at this with some isolation and stubs; this document uses a
-complete zc.async set up to examine the system holistically [#setUp]_.
+- zc.async could have a problem while performing a particular job.  We'll call
+  this a "job-related exception."
 
-These are the scenarios we'll contemplate:
+For the purpose of this discussion, we will omit the possibility that zc.async
+has a bug. That is certainly a possibility, but the recovery story is not
+predictable, and if we knew of a bug, we'd try to fix it, rather than discuss
+it here!
 
-- The system has a single dispatcher. The dispatcher is working on a job with a
-  callback. The dispatcher dies, and then restarts, cleaning up.  We'll do two
-  variants, one with a graceful shutdown and one with a hard crash.
+Exploring polling exceptions and job related exceptions will illuminate the
+more specific catastrophes you may encounter, and how your code and zc.async's
+can work together to handle them.  We'll discuss each, then drill down into
+some specific scenarios.
 
-- The system has two dispatchers. One dispatcher is working on a job with a
-  callback, and then dies. The other dispatcher cleans up.
+Polling Exceptions
+------------------
 
-- The system has a single dispatcher.  The dispatcher is working on a job, and
-  successfully completes it.  The callback begins, and then fails.
+Polling exceptions are, at least in theory, the least of your worries. You
+shouldn't have to worry about them; and if you do, it is probably a basic
+configuration problem that you need to address, such as making sure that the
+dispatcher process has access to the needed databases and software; or making
+sure that the dispatcher process is run by a daemonizing software that will
+restart if needed, such as zdaemon (http://pypi.python.org/pypi/zdaemon) or
+supervisor (http://supervisord.org/).
 
-- The system has a single dispatcher.  The dispatcher is working on a job, and
-  successfully completes it.  The callback begins, and then the dispatcher
-  dies.
+zc.async is largely responsible for dealing with polling exceptions. What does
+it have to handle?
 
-- The system has a single dispatcher.  The database goes away, and then comes
-  back.
+- The process running the poll ends, perhaps in the middle of a poll.
 
--------------------------------------------------
-Dispatcher Dies Gracefully While Performing a Job
--------------------------------------------------
+- zc.async cannot commit a transaction during the poll, for instance because of
+  a ConflictError, or because the database is unavailable.
 
-First let's consider how a failed job with a callback or two is handled when
-the dispatcher dies.
+What needs to happen to handle these problems?
 
-Here we start a job.
+Process Ends
+............
 
-    >>> import zope.component
-    >>> import threading
-    >>> import transaction
-    >>> import zc.async.interfaces
-    >>> import zc.async.testing
-    >>> import zc.async.dispatcher
+If the process ends, your daemonizing front-end (zdaemon, supervisor, etc.)
+needs to restart it. The ZODB will discard incomplete transaction data, if any.
 
-    >>> queue = root[zc.async.interfaces.KEY]['']
-    >>> lock = threading.Lock()
-    >>> lock.acquire()
-    True
-    >>> def wait_for_me():
-    ...     lock.acquire()
-    ...     lock.release() # so we can use the same lock again later
-    ...     raise SystemExit() # this will cause the worker thread to exit
-    ...
-    >>> def handle_error(result):
-    ...     return '...I handled the error...'
-    ...
-    >>> job = queue.put(wait_for_me)
-    >>> callback_job = job.addCallbacks(failure=handle_error)
-    >>> transaction.commit()
-    >>> dispatcher = zc.async.dispatcher.get()
-    >>> poll = zc.async.testing.get_poll(dispatcher)
-    >>> wait_for_start(job)
+The only thing a zc.async dispatcher needs to handle is clean up.
 
-In this scenario, ``wait_for_me`` is a job that will "unexpectedly" be lost
-while the dispatcher stops working.  ``handle_error`` is the hypothetical
-handler that should be called if the ``wait_for_me`` job fails.
+- Ideally it will be able to deactivate its record in the ZODB during the
+  process shutdown.
 
-The job has started. Now, the dispatcher suddenly dies without the thread
-performing ``wait_for_me`` getting a chance to finish. For our first example,
-let's give the dispatcher a graceful exit. The dispatcher gets a chance to
-clean up its dispatcher agents, and job.fail() goes into the queue.
+- Instead, if it was a "hard crash" that didn't allow deactivation, a sibling
+  dispatcher will realize that the dispatcher is down and deactivate it.
 
-    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
-    >>> wait_to_deactivate(dispatcher)
-    >>> _ = transaction.begin()
-    >>> job.status == zc.async.interfaces.ACTIVE
-    True
-    >>> len(queue)
-    1
-    >>> fail_job = queue[0]
-    >>> fail_job
-    <zc.async.job.Job (oid 51, db 'unnamed') ``zc.async.job.Job (oid 30, db 'unnamed') :fail()``>
-    >>> queue[0].callable
-    <bound method Job.fail of <zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.wait_for_me()``>>
+- Or, finally, if it was a hard crash without a sibling, and the daemon
+  restarts a process for the original dispatcher instance, the new process
+  needs to realize that the old process is dead, not competing with it.
 
-Now when the process starts back up again, our callback will be performed.
+Transaction Error
+.................
 
-    >>> old_dispatcher = dispatcher
-    >>> zc.async.dispatcher.clear()
-    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
-    ...         poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
-    >>> dispatcher = zc.async.dispatcher.get()
-    >>> zc.async.testing.wait_for_result(fail_job)
-    >>> job.status == zc.async.interfaces.COMPLETED
-    True
-    >>> job.result
-    <zc.twist.Failure zc.async.interfaces.AbortedError>
-    >>> callback_job.status == zc.async.interfaces.COMPLETED
-    True
-    >>> callback_job.result
-    '...I handled the error...'
+If the poll gets a conflict error, it should simply abort and retry the poll,
+forever, with a small back-off.
 
-So, our callback had a chance to do whatever it thought appropriate--in this
-case, simply returning a string--once the dispatcher got back online
-[#cleanup1]_.
+If the database goes away (perhaps the ZEO server goes down for a bit, and the
+ZEO client to which the dispatcher is connected is trying to reconnect) it
+should gracefully try to wait for the database to return, and resume when it
+does.
 
-------------------------------------------------
-Dispatcher Crashes "Hard" While Performing a Job
-------------------------------------------------
+Other, more dramatic errors, such as POSKey errors, are generally considered to
+be out of zc.async's domain and control. It should ideally continue to try to
+resume as long as the process is alive, in case somehow the situation improves,
+but this may be difficult and the expectations for zc.async's recovery are
+lower than with ConflictErrors and ClientDisconnected errors.
 
-Our next catastrophe only changes one aspect to the previous one: the
-dispatcher does not stop gracefully, and does not have a chance to clean up its
-active jobs.  It is a "hard" crash.
+Summary of Polling Exceptions
+.............................
 
-To show this, we will start a job, simulate the dispatcher dying "hard," and
-restart it so it clean up.
+To repeat, then, polling exceptions have two basic scenarios.
 
-So, first we start a long-running job in the dispatcher.
+If a dispatcher process ends, it needs to deactivate its record in the ZODB, or
+let another process know to deactivate it.
 
-    >>> lock.acquire()
-    True
-    >>> job = queue.put(wait_for_me)
-    >>> callback_job = job.addCallbacks(failure=handle_error)
-    >>> transaction.commit()
-    >>> dispatcher = zc.async.dispatcher.get()
-    >>> poll = zc.async.testing.get_poll(dispatcher)
-    >>> wait_for_start(job)
+If a ZODB.POSException.ConflictError occurs, retry forever with a small
+backoff; or if ZEO.Exceptions.ClientDisconnected occurs, retry forever with a
+small backoff, waiting for the database to come back.
 
-Now we'll "crash" the dispatcher.
+Most anything else will ideally keep zc.async attempting to re-poll, but it may
+not happen: expectations are lower.
 
-    >>> dispatcher.activated = False # this will make polling stop, without
-    ...                              # cleanup
-    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
-    >>> dispatcher.thread.join(3)
+Job-Related Exceptions
+----------------------
 
-Hard crashes can be detected because the dispatchers write datetimes to the
-database every few polls. A given dispatcher instance does this for each queue
-on a ``DispatcherAgents`` object available in ``queue.dispatchers[UUID]``,
-where ``UUID`` is the uuid of that dispatcher.
+What about job-related exceptions? Responsibility for handling job-related
+exceptions is shared between your code and zc.async's.  What might happen?
 
-The ``DispatcherAgents`` object has four pertinent attributes:
-``ping_interval``, ``ping_death_interval``, ``last_ping``, and ``dead``. About
-every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is supposed
-to write a ``datetime`` to ``last_ping``. If the ``last_ping`` plus the
-``ping_death_interval`` (also a ``timedelta``) is older than now, the
-dispatcher is considered to be ``dead``, and old jobs should be cleaned up.
+- Your job might fail internally.
 
-The ``ping_interval`` defaults to 30 seconds, and the ``ping_death_interval``
-defaults to 60 seconds. Generally, the ``ping_death_interval`` should be at
-least two or three poll intervals (``zc.async.dispatcher.get().poll_interval``)
-greater than the ``ping_interval``.
+- The process running your task ends before completing your task.
 
-The ping hasn't timed out yet, so the dispatcher isn't considered dead yet.
+- zc.async cannot commit a transaction after your task completes, for instance
+  because of a ConflictError, or because the database is unavailable.
 
-    >>> _ = transaction.begin()
-    >>> import zc.async.instanceuuid
-    >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
-    >>> da.ping_death_interval
-    datetime.timedelta(0, 60)
-    >>> da.ping_interval
-    datetime.timedelta(0, 30)
-    >>> bool(da.activated)
-    True
-    >>> da.dead
-    False
+What should occur to handle these problems?
 
-Therefore, the job is still sitting around in the dispatcher's pile in the
-database (the ``main`` key is for the ``main`` agent installed in this
-dispatcher in the set up for these examples).
+Job Fails
+.........
 
-    >>> job in da['main']
-    True
-    >>> job.status == zc.async.interfaces.ACTIVE
-    True
+As discussed elsewhere, if your job fails in your own code, this is mostly
+your reponsibility. You should handle possible errors both within your job's
+code, and in callbacks, as appropriate. zc.async's responsibilities are merely
+to report.
 
-Let's start our dispatcher up again.
+By default, zc.async will log a failure of a job entered in a queue at the
+"ERROR" level in the ``zc.async.events`` log, and it will log a failure of a
+callback or other internal job at the "CRITICAL" level. This can be controlled
+per-process and per-job, as we'll see below. These tracebacks include
+information about the local and global variables for each frame in the stack,
+which can be useful to deduce the problem that occurred.
 
-    >>> old_dispatcher = dispatcher
-    >>> zc.async.dispatcher.clear()
-    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
-    ...         poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
-    >>> dispatcher = zc.async.dispatcher.get()
+zc.async also includes a ``Failure`` object on the job as a result, to let you
+react to the problem in a callback, and analyze it later.  This is discussed in
+detail elsewhere.
 
-Initially, it's going to be a bit confused, because it sees that the
-DispatcherAgents object is ``activated``, and not ``dead``. It can't tell if
-there's another process using its same UUID, or if it is looking at the result
-of a hard crash.
+The RetryPolicy, discussed later, can try to react to exceptions from the job's
+code, but the default policies included in the package do not.
 
-    >>> zc.async.testing.wait_for_result(job, seconds=1)
-    Traceback (most recent call last):
-    ...
-    AssertionError: job never completed
-    >>> zc.async.testing.get_poll(dispatcher, seconds=1)
-    {'': None}
-    >>> for r in reversed(event_logs.records):
-    ...     if r.levelname == 'ERROR':
-    ...         break
-    ... else:
-    ...     assert False, 'did not find log'
-    ...
-    >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    UUID ... already activated in queue  (oid 4): another process?
-    (To stop poll attempts in this process, set
-    ``zc.async.dispatcher.get().activated = False``.  To stop polls
-    permanently, don't start a zc.async.dispatcher!)
+Process Ends
+............
 
+If a process ends while it is performing a job, that is similar, in large part,
+to the possibility of the process ending the polling job: we need to restart
+the process, and realize that we had started the job. But should we restart the
+job, or abort it?
 
-To speed up the realization of our dispatcher that the previous activation is
-``dead``, we'll set the ping_death_interval to just one second.
+Answering this question is a matter of policy, and requires knowing what each
+job does.
 
-    >>> _ = transaction.begin()
-    >>> da.dead
-    False
-    >>> import datetime
-    >>> da.ping_death_interval = datetime.timedelta(seconds=1)
-    >>> da.dead
-    True
-    >>> bool(da.activated)
-    True
-    >>> transaction.commit()
+Generally, if a job is fully transactional, such as writing something to the
+ZODB, and the job has not timed out yet, you'll want to restart it. You might
+want to restart some reasonably large number of times, and then suspect that,
+since you can't seem to finish the job, maybe the job is causing the process to
+die, and you should abort.  Or perhaps you want to restart for ever.
 
-After the next poll, the dispatcher will have cleaned up its old tasks in the
-same way we saw in the previous example. The job's ``fail`` method will be
-called, and the callback will be performed.  The DispatcherAgents object is
-no longer dead, because it is tied to the new instance of the dispatcher.
+If the job isn't transactional, such as communicating with an external service,
+you might want to abort the job, and set up some callbacks to handle the
+fallout.
 
-    >>> poll = zc.async.testing.get_poll(dispatcher)
-    >>> _ = transaction.begin()
-    >>> job in da['main']
-    False
-    >>> bool(da.activated)
-    True
-    >>> da.dead
-    False
-    >>> fail_job = job.parent
-    >>> fail_job
-    <zc.async.job.Job (oid 78, db 'unnamed') ``zc.async.job.Job (oid 57, db 'unnamed') :fail()``>
+As we'll see below, zc.async defaults to guessing that jobs placed directly in
+a queue are transactional, and can be restarted up to ten times; and that jobs
+used as callbacks are also transactional, and should be restarted until they
+succeed.  The defaults can be changed and the behavior of an individual
+job can be changed.
 
-Let's see it happen.
+These settings are controlled with a RetryPolicy, discussed below.
 
-    >>> zc.async.testing.wait_for_result(fail_job)
-    >>> job.status == zc.async.interfaces.COMPLETED
-    True
-    >>> job.result
-    <zc.twist.Failure zc.async.interfaces.AbortedError>
-    >>> callback_job.status == zc.async.interfaces.COMPLETED
-    True
-    >>> callback_job.result
-    '...I handled the error...'
+Transaction Error
+.................
 
-The dispatcher cleaned up its own "hard" crash.
+Handling transaction errors after processing a job is also similar to the
+handling of transaction errors for polling exceptions. ConflictErrors and
+ClientDisconnected errors should often cause jobs to be aborted and restarted.
+However, if the job is not transactional, such as communicating with an
+external service, a simple abourt and retry may be hazardous. Also, many jobs
+should be stopped if they retry on ConflictError more than some heuristic
+bellweather, with the logic that they may be simply doing something too
+problematic, and they are blocking other tasks from starting. But other jobs
+should be retried until they complete.
 
-[#cleanup2]_
+As mentioned above, zc.async defaults to guessing that jobs are transactional.
+Client Disconnected errors are retried forever.  Jobs placed in a queue retry
+ConflictErrors five times, while callbacks retry them forever, with a small
+backoff.  The defaults can be changed and the behavior of an individual
+job can be changed, using the RetryPolicy described below.
 
------------------------------------------------------------------
-Dispatcher Crashes "Hard" While Performing a Job, Sibling Resumes
------------------------------------------------------------------
+While custom RetryPolicies can try to handle other transaction errors, they are
+generally considered to be out of zc.async's domain and control.
 
-Our next catastrophe is the same as the one before, except, after one
-dispatcher's hard crash, another dispatcher is around to clean up the dead
-jobs.
+Summary of Job-Related Exceptions
+.................................
 
-To show this, we will start a job, start a second dispatcher, simulate the
-first dispatcher dying "hard," and watch the second dispatcher clean up
-after the first one.
+If an exception handles in your job's code, zc.async will log it as an ERROR
+if a main queue job and as CRITICAL if it is a callback; and it will make the
+result of the call a ``Failure`` with error information, as shown elsewhere.
+Everything else is your responsibility, to be handled with try:except or
+try:finally blocks in your code, callbacks, or custom RetryPolicies.
 
-So, first we start a long-running job in the dispatcher as before.
+Process death, conflict errors, and ``ClientDisconnected`` errors all may need
+to be handled differently for different jobs. zc.async has a default policy for
+jobs placed in a queue, and for callback jobs. The default policy, a
+RetryPolicy, can be changed and can be set explicitly per-job.
 
-    >>> lock.acquire()
-    True
-    >>> job = queue.put(wait_for_me)
-    >>> callback_job = job.addCallbacks(failure=handle_error)
-    >>> transaction.commit()
-    >>> dispatcher = zc.async.dispatcher.get()
-    >>> poll = zc.async.testing.get_poll(dispatcher)
-    >>> wait_for_start(job)
+Your Responsibilities
+---------------------
 
-Now we'll start up an alternate dispatcher.
+As the author of a zc.async job, your responsibilities, then, are to handle
+your own exceptions; and to make sure that the retry policy for each job is
+appropriate.  This is controlled with an IRetryPolicy, as shown below.
 
-    >>> import uuid
-    >>> alt_uuid = uuid.uuid1()
-    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
-    ...     poll_interval=0.5, uuid=alt_uuid)(
-    ...     zc.async.interfaces.DatabaseOpened(db))
-    >>> alt_dispatcher = zc.async.dispatcher.get(alt_uuid)
+As someone configuring a running dispatcher, you need to make sure that you
+give the dispatcher the necessary access to databases and software to perform
+your jobs, and you need to review (and rotate!) your logs.
 
-We're also going to set the main dispatcher's ``ping_death_interval`` back to
-60 seconds so we can see some polls in the alternate dispatcher before it gets
-around to cleaning up.
+zc.async's Responsibilities
+---------------------------
 
-    >>> da.ping_death_interval = datetime.timedelta(seconds=60)
-    >>> transaction.commit()
+zc.async needs to have polling robust in the face of restarts, ConflictErrors
+and ClientDisconnected errors. It needs to give your code a chance to decide
+what to do in these circumstances, and log your errors.
 
-Now we'll "crash" the dispatcher.
+Retry Policies
+--------------
 
-    >>> dispatcher.activated = False # this will make polling stop, without
-    ...                              # cleanup
-    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
-    >>> dispatcher.thread.join(3)
+The rest of the document uses scenarios to illustrate how zc.async handles
+errors, and how you might want to configure retry policies.  Retry policies are
+given a job, and then exception information, and then can determine whether
+zc.async should retry the job. zc.async comes with three retry policies.
 
-As discussed in the previous example, the polling hasn't timed out yet, so the
-alternate dispatcher can't know that the first one is dead. Therefore, the job
-is still sitting around in the old dispatcher's pile in the database.
+- One is the absence of a retry policy: do not retry this job.  A value of
+  ``None`` represents this policy.  This is appropriate for tasks that are
+  not transactional.  They typically need to be handled with a callback.
 
-    >>> _ = transaction.begin()
-    >>> bool(da.activated)
+- The default is a retry policy that retries after a restart up to four times,
+  retries a ConflictError up to four times, and retries a ClientDisconnected up
+  to four times.
+
+- One is a retry policy that never gives up on restarts, ConflictErrors, or
+  ClientDisconnected errors: it has no limit but keeps trying forever.
+
+Scenarios
+---------
+
+We'll examine a number of scenarios, many of which combine problems in polling
+and in jobs.  In these scenarios, we'll refer to a job that was placed directly
+in a queue as a "queue job".  A job that was performed in any other way is a
+"callback job," because most often these are callbacks.
+
+- Polling errors
+
+  * The system is polling and gets a ConflictError.
+
+  * The system is polling and gets a ClientDisconnected error.
+
+- Internal job errors
+
+  * A worker process is polling and working on a queue job. The queue job fails
+    internally.
+
+  * A worker process is polling and working on a callback job.  The callback job
+    fails internally.
+
+- Default retry policy
+
+  * A worker process is working on a job with the default retry policy and gets
+    a ConflictError during the commit.
+  
+  * A worker process is working on a job with the default retry policy and gets
+    a ClientDisconnected error.
+  
+  * A worker process is working on a job with the default retry policy. The
+    process dies gracefully and restarts.
+  
+  * Like the previous scenario, a worker process is working on a job with the
+    default retry policy. The process crashes hard (does not die gracefully)
+    and restarts.
+  
+  * Like the previous scenario, a worker process is working on a job with the
+    default retry policy. The process crashes hard (does not die gracefully)
+    and a sibling notices and takes over.
+
+- No retry policy
+
+  * A worker process is working on a job without a retry policy (that is,
+    zc.async should not retry it) and gets a ConflictError during the commit.
+  
+  * A worker process is working on a job without a retry policy (that is,
+    zc.async should not retry it) and gets a ClientDisconnected error.
+  
+  * A worker process is working on a job with no retry policy (that is,
+    zc.async should not retry it). The process dies gracefully and restarts.
+  
+  * Like the previous scenario, a worker process is working on a job with no
+    retry policy (that is, zc.async should not retry it). The process crashes
+    hard (does not die gracefully) and restarts.
+  
+  * Like the previous scenario, a worker process is working on a job with no
+    retry policy (that is, zc.async should not retry it). The process crashes
+    hard (does not die gracefully) and a sibling notices and takes over.
+
+- Retry forever policy
+
+  * A worker process is working on a job with the retry-forever policy and gets
+    a ConflictError during the commit.
+  
+  * A worker process is working on a job with the retry-forever policy and gets
+    a ClientDisconnected error.
+  
+  * A worker process is working on a job with the retry-forever policy. The
+    process dies gracefully and restarts.
+  
+  * Like the previous scenario, a worker process is working on a job with the
+    retry-forever policy. The process crashes hard (does not die gracefully)
+    and restarts.
+  
+  * Like the previous scenario, a worker process is working on a job with the
+    retry-forever policy. The process crashes hard (does not die gracefully)
+    and a sibling notices and takes over.
+
+We will close with customizations:
+
+- custom retry policies, particularly for non-transactional tasks;
+
+- changing the default retry policy, per-process and per-agent; and
+
+- changing the default log level for queue jobs, callback jobs, and per-job.
+
+-------------------------
+Scenarios: Polling Errors
+-------------------------
+
+ConflictError
+-------------
+
+A common place for a conflict error is with two dispatchers trying to claim the
+same job from the queue.  This example will mimic that situation.
+
+Imagine we have a full set up with a dispatcher, agent, and queue. [#setUp]_
+We'll actually replace the agent's chooser with one that behaves badly: it
+blocks, waiting for our lock.
+
+    >>> import threading
+    >>> lock1 = threading.Lock()
+    >>> lock2 = threading.Lock()
+    >>> lock1.acquire()
     True
-    >>> da.dead
-    False
-    >>> job.status == zc.async.interfaces.ACTIVE
-    True
-    >>> alt_poll_1 = zc.async.testing.get_poll(alt_dispatcher)
+    >>> lock2.acquire()
+    >>> def acquireLockAndchooseFirst(agent):
+    ...     res = agent.queue.claim()
+    ...     if res is not None:
+    ...         lock2.release()
+    ...         lock1.acquire()
+    ...     return res
+    ...
+    >>> import zc.async.instanceuuid
+    >>> import zc.async.interfaces
+    >>> import transaction
     >>> _ = transaction.begin()
-    >>> job in da['main']
-    True
-    >>> bool(da.activated)
-    True
-    >>> da.dead
-    False
-    >>> alt_poll_2 = zc.async.testing.get_poll(alt_dispatcher)
+    >>> queues = root[zc.async.interfaces.KEY]
+    >>> queue = queues['']
+    >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+    >>> agent = da['main']
+    >>> agent.chooser = acquireLockAndchooseFirst
+    >>> def returnSomething():
+    ...     return 42
+    ...
+    >>> job = queue.put(returnSomething)
+    >>> transaction.commit()
+
+Now, when the agent tries to get our job, we'll start and commit another
+transaction that removes it from the queue.  This will generate a conflict
+error for the poll's thread and transaction, because it cannot also remove the
+same job.
+
+    >>> lock2.acquire()
     >>> _ = transaction.begin()
-    >>> job in da['main']
+    >>> job is queue.pull()
     True
-    >>> bool(da.activated)
-    True
-    >>> da.dead
-    False
+    >>> transaction.commit()
+    >>> lock1.release()
 
-Above, the ping_death_interval was returned to the default of 60 seconds. To
-speed up the realization of our second dispatcher that the first one is dead,
-we'll set the ping_death_interval back down to just one second.
+However, the ConflictError is handled, and polling continues.
 
-    >>> da.ping_death_interval
-    datetime.timedelta(0, 60)
-    >>> import datetime
-    >>> da.ping_death_interval = datetime.timedelta(seconds=1)
-    >>> da.dead
+    >>> _ = transaction.begin()
+    >>> import zc.async.agent
+    >>> agent.chooser = zc.async.agent.chooseFirst
+    >>> transaction.commit()
+    >>> import zc.async.dispatcher
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> import zc.async.testing
+    >>> zc.async.testing.get_poll(dispatcher)
+
+And if we put the job back, it will be performed.
+
+    >>> job is queue.put(job)
     True
-    >>> bool(da.activated)
-    True
     >>> transaction.commit()
+    >>> zc.async.testing.wait_for_result(job)
+    42
 
-After the second dispatcher gets a poll--a chance to notice--it will have
-cleaned up the first dispatcher's old tasks in the same way we saw in the
-previous example.  The job's ``fail`` method will be called, and the callback
-will be performed.
+Client Disconnected
+-------------------
 
-    >>> alt_poll_3 = zc.async.testing.get_poll(alt_dispatcher)
-    >>> _ = transaction.begin()
-    >>> job in da['main']
-    False
-    >>> bool(da.activated)
-    False
-    >>> da.dead
+The story is very similar if the ZEO connection goes away for a while.  We'll
+mimic a ZEO ClientDisconnected error by monkeypatching
+transaction.TranasctionManager.commit.
+
+    >>> lock1.locked()
     True
-    >>> fail_job = job.parent
-    >>> fail_job
-    <zc.async.job.Job (oid 121, db 'unnamed') ``zc.async.job.Job (oid 84, db 'unnamed') :fail()``>
-    >>> zc.async.testing.wait_for_result(fail_job)
-    >>> job.status == zc.async.interfaces.COMPLETED
+    >>> lock2.locked()
     True
-    >>> job.result
-    <zc.twist.Failure zc.async.interfaces.AbortedError>
-    >>> callback_job.status == zc.async.interfaces.COMPLETED
-    True
-    >>> callback_job.result
-    '...I handled the error...'
 
-The sibling, then, was able to clean up the mess left by the "hard" crash of
-the first dispatcher.
+    >>> job = queue.put(returnSomething)
+    >>> transaction.commit()
 
-[#cleanup3]_
+    >>> lock2.acquire()
+    >>> import ZEO.Exceptions
+    >>> def commit(self):
+    ...     raise ZEO.Exceptions.ClientDisconnected()
+    ...
+    >>> import transaction
+    >>> old_commit = transaction.TranasctionManager.commit
+    >>> transaction.TranasctionManager.commit = commit
+    >>> lock1.release()
+    >>> zc.async.testing.get_poll(dispatcher)
+    >>> transaction.TranasctionManager.commit = old_commit
+    >>> zc.async.testing.wait_for_result(job)
+    42
 
---------------
-Callback Fails
---------------
+Here's another variant that mimics being unable to read the storage during a
+poll, and then recouperating.
 
--------------------------------
-Dispatcher Dies During Callback
--------------------------------
+    >>> error_raised = 0
+    >>> def raiseDisconnectedThenChooseFirst(agent):
+    ...     if not error_raised:
+    ...         raise ZEO.Exceptions.ClientDisconnected()
+    ...     return agent.queue.claim()
+    >>> agent.chooser = raiseDisconnectedThenChooseFirst
+    >>> def returnSomething():
+    ...     return 42
+    ...
+    >>> job = queue.put(returnSomething)
+    >>> transaction.commit()
+    >>> zc.async.testing.get_poll(dispatcher)
+    >>> zc.async.testing.wait_for_result(job)
+    42
 
 ------------------------------
-Database Disappears For Awhile
+Scenarios: Internal Job Errors
 ------------------------------
 
----------------------------------------------
-Other Catastrophes, And Your Responsibilities
----------------------------------------------
+Queue Job
+---------
 
-There are some catastrophes from which there are no easy fixes like these.  For
-instance, imagine you have communicated with an external system, and gotten a
-reply that you have successfully made a transaction there, but then the
-dispatcher dies, or the database disappears, before you have a chance to commit
-the local transaction recording the success.  Your code needs to see that
+Callback Job
+------------
 
-...multidatabase...
+-------------------------------
+Scenarios: Default Retry Policy
+-------------------------------
 
+ConflictError
+-------------
+
+ClientDisconnected
+------------------
+
+Graceful Shutdown
+-----------------
+
+Hard Crash
+----------
+
+Hard Crash and Sibling
+----------------------
+
+--------------------------
+Scenarios: No Retry Policy
+--------------------------
+
+ConflictError
+-------------
+
+ClientDisconnected
+------------------
+
+Graceful Shutdown
+-----------------
+
+Hard Crash
+----------
+
+Hard Crash and Sibling
+----------------------
+
+-------------------------------
+Scenarios: Retry-Forever Policy
+-------------------------------
+
+ConflictError
+-------------
+
+ClientDisconnected
+------------------
+
+Graceful Shutdown
+-----------------
+
+Hard Crash
+----------
+
+Hard Crash and Sibling
+----------------------
+
+--------------
+Customizations
+--------------
+
+Changing the Default Retry Policy
+---------------------------------
+
+Creating a New Retry Policy
+---------------------------
+
+Changing the Log Level
+----------------------
+
+
+
+
+
+
+
+
+
+
 .. ......... ..
 .. Footnotes ..
 .. ......... ..
@@ -456,37 +576,4 @@
     ...             break
     ...         time.sleep(0.1)
     ...     else:
-    ...         assert False, 'dispatcher never deactivated'
-
-.. [#cleanup1]
-
-    >>> lock.release()
-    >>> old_dispatcher.thread.join(3)
-    >>> old_dispatcher.dead_pools[0].threads[0].join(3)
-
-.. [#cleanup2]
-
-    >>> lock.release()
-    >>> old_dispatcher.thread.join(3)
-    >>> for queue_pools in old_dispatcher.queues.values():
-    ...     for name, pool in queue_pools.items():
-    ...         pool.setSize(0)
-    ...         for thread in pool.threads:
-    ...             thread.join(3)
-    ...
-    -3
-
-.. [#cleanup3]
-
-    >>> lock.release()
-    >>> dispatcher.thread.join(3)
-    >>> for queue_pools in dispatcher.queues.values():
-    ...     for name, pool in queue_pools.items():
-    ...         pool.setSize(0)
-    ...         for thread in pool.threads:
-    ...             thread.join(3)
-    ...
-    -3
-    >>> alt_dispatcher.reactor.callFromThread(alt_dispatcher.reactor.stop)
-    >>> alt_dispatcher.thread.join(3)
-    >>> alt_dispatcher.dead_pools[0].threads[0].join(3)
+    ...         assert False, 'dispatcher never deactivated'
\ No newline at end of file

Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py	2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py	2008-05-20 01:08:43 UTC (rev 86848)
@@ -152,16 +152,6 @@
                     local.job = job
                     try:
                         job() # this does the committing and retrying, largely
-                    except ZODB.POSException.TransactionError:
-                        transaction.abort()
-                        while 1:
-                            job.fail()
-                            try:
-                                transaction.commit()
-                            except ZODB.POSException.TransactionError:
-                                transaction.abort() # retry forever (!)
-                            else:
-                                break
                     except zc.async.interfaces.BadStatusError:
                         transaction.abort()
                         zc.async.utils.log.error( # notice, not tracelog
@@ -169,6 +159,7 @@
                         if job.status == zc.async.interfaces.CALLBACKS:
                             job.resumeCallbacks() # moves the job off the agent
                         else:
+                            count = 0
                             while 1:
                                 status = job.status
                                 if status == zc.async.interfaces.COMPLETED:
@@ -180,17 +171,38 @@
                                     job.fail() # moves the job off the agent
                                 try:
                                     transaction.commit()
-                                except ZODB.POSException.TransactionError:
+                                except (ZODB.POSException.TransactionError,
+                                        ZODB.POSException.POSError):
+                                    if count and not count % 10:
+                                        zc.async.utils.log.critical(
+                                            'frequent database errors!  '
+                                            'I retry forever...',
+                                            exc_info=True)
+                                    time.sleep(1)
                                     transaction.abort() # retry forever (!)
                                 else:
                                     break
+                    except zc.async.utils.EXPLOSIVE_ERRORS:
+                        transaction.abort()
+                        raise
+                    except:
+                        # all errors should have been handled by the job at
+                        # this point, so anything other than BadStatusError,
+                        # SystemExit and KeyboardInterrupt are bad surprises.
+                        transaction.abort()
+                        zc.async.utils.log.critical(
+                            'unexpected error', exc_info=True)
+                        raise
                     # should come before 'completed' for threading dance
                     if isinstance(job.result, twisted.python.failure.Failure):
                         info['failed'] = True
                         info['result'] = job.result.getTraceback(
-                            elideFrameworkCode=True, detail='verbose')
+                            elideFrameworkCode=True)
                     else:
                         info['result'] = repr(job.result)
+                    if len(info['result']) > 10000:
+                        info['result'] = (
+                            info['result'][:10000] + '\n[...TRUNCATED...]')
                     info['completed'] = datetime.datetime.utcnow()
                 finally:
                     local.job = None
@@ -233,7 +245,7 @@
                 self.queue.put(None)
         return size - old # size difference
 
-# this is mostly for testing
+# this is mostly for testing, though ``get`` comes in handy generally
 
 _dispatchers = {}
 
@@ -249,6 +261,8 @@
 
 clear = _dispatchers.clear
 
+# end of testing bits
+
 class Dispatcher(object):
 
     activated = False
@@ -292,57 +306,22 @@
         self.dead_pools = []
 
     def _getJob(self, agent):
-        try:
-            job = agent.claimJob()
-        except zc.twist.EXPLOSIVE_ERRORS:
-            transaction.abort()
-            raise
-        except:
-            transaction.abort()
-            zc.async.utils.log.error(
-                'Error trying to get job for UUID %s from '
-                'agent %s (oid %s) in queue %s (oid %s)', 
-                self.UUID, agent.name, agent._p_oid,
-                agent.queue.name,
-                agent.queue._p_oid, exc_info=True)
-            return zc.twist.Failure()
-        res = self._commit(
-            'Error trying to commit getting a job for UUID %s from '
-            'agent %s (oid %s) in queue %s (oid %s)' % (
-            self.UUID, agent.name, agent._p_oid,
-            agent.queue.name,
-            agent.queue._p_oid))
-        if res is None:
-            # Successful commit
-            res = job
+        identifier = (
+            'getting job for UUID %s from agent %s (oid %d) '
+            'in queue %s (oid %d)' % (
+                self.UUID, agent.name, ZODB.utils.u64(agent._p_oid),
+                agent.queue.name, ZODB.utils.u64(agent.queue._p_oid)))
+        res = zc.async.utils.try_transaction_five_times(
+            agent.claimJob, identifier, transaction)
+        if isinstance(res, twisted.python.failure.Failure):
+            identifier = 'stashing failure on agent %s (oid %s)' % (
+                agent.name, ZODB.utils.u64(agent._p_oid))
+            def setFailure():
+                agent.failure = res
+            zc.async.utils.try_transaction_five_times(
+                setFailure, identifier, transaction)
         return res
 
-    def _commit(self, debug_string=''):
-        retry = 0
-        while 1:
-            try:
-                transaction.commit()
-            except ZODB.POSException.TransactionError:
-                transaction.abort()
-                if retry >= 5:
-                    zc.async.utils.log.error(
-                        'Repeated transaction error trying to commit in '
-                        'zc.async: %s', 
-                        debug_string, exc_info=True)
-                    return zc.twist.Failure()
-                retry += 1
-            except zc.twist.EXPLOSIVE_ERRORS:
-                transaction.abort()
-                raise
-            except:
-                transaction.abort()
-                zc.async.utils.log.error(
-                    'Error trying to commit: %s', 
-                    debug_string, exc_info=True)
-                return zc.twist.Failure()
-            else:
-                break
-
     def poll(self):
         poll_info = PollInfo()
         started_jobs = []
@@ -358,29 +337,30 @@
                     queue.dispatchers.register(self.UUID)
                 da = queue.dispatchers[self.UUID]
                 if queue._p_oid not in self._activated:
-                    if da.activated:
-                        if da.dead:
-                            da.deactivate()
-                        else:
-                            zc.async.utils.log.error(
-                                'UUID %s already activated in queue %s '
-                                '(oid %d): another process?  (To stop '
-                                'poll attempts in this process, set '
-                                '``zc.async.dispatcher.get().activated = '
-                                "False``.  To stop polls permanently, don't "
-                                'start a zc.async.dispatcher!)',
-                                self.UUID, queue.name,
-                                ZODB.utils.u64(queue._p_oid))
-                            continue
-                    da.activate()
-                    self._activated.add(queue._p_oid)
-                    # removed below if transaction fails
-                    res = self._commit(
-                        'Error trying to commit activation of UUID %s in '
-                        'queue %s (oid %s)' % (
-                            self.UUID, queue.name, queue._p_oid))
-                    if res is not None:
-                        self._activated.remove(queue._p_oid)
+                    identifier = (
+                        'activating dispatcher UUID %s in queue %s (oid %d)' %
+                        (self.UUID, queue.name, ZODB.utils.u64(queue._p_oid)))
+                    def activate():
+                        if da.activated:
+                            if da.dead:
+                                da.deactivate()
+                            else:
+                                zc.async.utils.log.error(
+                                    'UUID %s already activated in queue %s '
+                                    '(oid %d): another process?  (To stop '
+                                    'poll attempts in this process, set '
+                                    '``zc.async.dispatcher.get().activated = '
+                                    "False``.  To stop polls permanently, don't "
+                                    'start a zc.async.dispatcher!)',
+                                    self.UUID, queue.name,
+                                    ZODB.utils.u64(queue._p_oid))
+                                return False
+                        da.activate()
+                        return True
+                    if zc.async.utils.try_transaction_five_times(
+                        activate, identifier, transaction) is True:
+                        self._activated.add(queue._p_oid)
+                    else:
                         continue
                 queue_info = poll_info[queue.name] = {}
                 pools = self.queues.get(queue.name)
@@ -398,7 +378,7 @@
                     try:
                         agent_info['size'] = agent.size
                         agent_info['len'] = len(agent)
-                    except zc.twist.EXPLOSIVE_ERRORS:
+                    except zc.async.utils.EXPLOSIVE_ERRORS:
                         raise
                     except:
                         agent_info['error'] = zc.twist.Failure()
@@ -419,17 +399,6 @@
                         if isinstance(job, twisted.python.failure.Failure):
                             agent_info['error'] = job
                             job = None
-                            try:
-                                agent.failure = res
-                            except zc.twist.EXPLOSIVE_ERRORS:
-                                raise
-                            except:
-                                transaction.abort()
-                                zc.async.utils.log.error(
-                                    'error trying to stash failure on agent')
-                            else:
-                                # TODO improve msg
-                                self._commit('trying to stash failure on agent')
                         else:
                             info = {'result': None,
                                     'failed': False,
@@ -448,8 +417,10 @@
                             pool.queue.put(
                                 (job._p_oid, dbname, info))
                             job = self._getJob(agent)
-                queue.dispatchers.ping(self.UUID)
-                self._commit('trying to commit ping')
+                identifier = 'committing ping for UUID %s' % (self.UUID,)
+                zc.async.utils.try_transaction_five_times(
+                    lambda: queue.dispatchers.ping(self.UUID), identifier,
+                    transaction)
                 if len(pools) > len(queue_info):
                     conn_delta = 0
                     for name, pool in pools.items():
@@ -539,13 +510,16 @@
         try:
             transaction.begin()
             try:
-                queues = self.conn.root().get(zc.async.interfaces.KEY)
-                if queues is not None:
-                    for queue in queues.values():
-                        da = queue.dispatchers.get(self.UUID)
-                        if da is not None and da.activated:
-                            da.deactivate()
-                    self._commit('trying to tear down')
+                identifier = 'cleanly deactivating UUID %s' % (self.UUID,)
+                def deactivate_das():
+                    queues = self.conn.root().get(zc.async.interfaces.KEY)
+                    if queues is not None:
+                        for queue in queues.values():
+                            da = queue.dispatchers.get(self.UUID)
+                            if da is not None and da.activated:
+                                da.deactivate()
+                zc.async.utils.try_transaction_five_times(
+                    deactivate_das, identifier, transaction)
             finally:
                 transaction.abort()
                 self.conn.close()

Modified: zc.async/branches/dev/src/zc/async/job.py
===================================================================
--- zc.async/branches/dev/src/zc/async/job.py	2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/job.py	2008-05-20 01:08:43 UTC (rev 86848)
@@ -11,11 +11,14 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
+import time
 import types
 import datetime
+import logging
 
 import BTrees.OOBTree
 import ZODB.POSException
+import ZEO.Exceptions
 import transaction.interfaces
 import persistent
 import persistent.list
@@ -74,6 +77,110 @@
                     elif status == zc.async.interfaces.CALLBACKS:
                         a.resumeCallbacks()
 
+class IRetries(zope.interface.Interface): # XXX move
+    def jobError(failure, data_cache):
+        """whether and how to retry after an error while performing job.
+        
+        return boolean as to whether to retry, or a datetime or timedelta to
+        reschedule the job in the queue.  An empty timedelta means to rescedule
+        for immediately, before any pending calls in the queue."""
+
+    def transactionError(failure, data_cache):
+        """whether to retry after trying to commit a job's successful result.
+        
+        return boolean as to whether to retry, or a datetime or timedelta to
+        reschedule the job in the queue.  An empty timedelta means to rescedule
+        for immediately, before any pending calls in the queue."""
+
+    def interrupted():
+        """whether to retry after a dispatcher dies when job was in progress.
+        
+        return boolean as to whether to retry, or a datetime or timedelta to
+        reschedule the job in the queue.  An empty timedelta means to rescedule
+        for immediately, before any pending calls in the queue."""
+
+    def updateData(data_cache):
+        """right before committing a job, retry is given a chance to stash
+        information it has saved in the data_cache."""
+
+class Retries(persistent.Persistent): # default for '' IRetries
+    zope.component.adapts(zc.async.interfaces.IJob)
+    zope.interface.implements(zc.async.interfaces.IRetries)
+
+    # exceptions, data_cache key, max retry, initial backoff seconds,
+    # incremental backoff seconds, max backoff seconds
+    internal_exceptions = (
+        ((ZEO.Exceptions.ClientDisconnected,), 'zeo_disconnected',
+         None, 5, 5, 60),
+        ((ZODB.POSException.TransactionError,), 'transaction_error',
+         5, 0, 0, 0),
+    )
+    transaction_exceptions = internal_exceptions
+    max_interruptions = 10
+
+    def __init__(self, job):
+        self.parent = self.__parent__ = job
+        self.data = BTrees.family32.OO.BTree()
+
+    def updateData(data_cache):
+        if 'first_active' in self.data and 'first_active' in data_cache:
+            data_cache.pop('first_active')
+        self.data.update(data_cache)
+
+    def jobError(self, failure, data_cache):
+        return self._process(failure, data_cache, self.internal_exceptions)
+
+    def transactionError(self, failure, data_cache):
+        return self._process(failure, data_cache, self.transaction_exceptions)
+
+    def _process(self, failure, data_cache, exceptions):
+        for (exc, key, max_count, init_backoff,
+             incr_backoff, max_backoff) in exceptions:
+            if failure.check(*exc) is not None:
+                count = data_cache.get(key, 0) + 1
+                if max_count is not None and count >= max_count:
+                    return False
+                backoff = min(max_backoff,
+                              (init_backoff + (count-1) * incr_backoff))
+                if backoff:
+                    time.sleep(backoff)
+                data_cache[key] = count
+                data_cache['last_' + key] = failure
+                if 'first_active' not in data_cache:
+                    data_cache['first_active'] = self.parent.active_start
+                return True
+        return False
+
+    def interrupted(self):
+        if 'first_active' not in self.data:
+            self.data['first_active'] = self.parent.active_start
+        ct = self.data['interruptions'] = self.data.get('interruptions', 0) + 1
+        return self.max_interruptions is None or ct <= self.max_interruptions
+
+class RetrySystemErrorsForever(Retries): # default for 'callback' IRetries
+    # retry on ZEO failures and Transaction errors during the job forever
+    # retry on transactionErrors and interrupteds forever.
+    internal_exceptions = (
+        ((ZEO.Exceptions.ClientDisconnected,), 'zeo_disconnected',
+         None, 5, 5, 60),
+        ((ZODB.POSException.TransactionError,), 'transaction_error',
+         None, 0, 0, 0),
+    )
+    
+    max_interruptions = None
+
+    def transactionError(self, failure, data_cache):
+        res = super(RetryForever, self).transactionError(failure, data_cache)
+        if not res:
+            # that just means we didn't record it.  We actually are going to
+            # retry.
+            key = 'other'
+            data_cache['other'] = data_cache.get('other', 0) + 1
+            data_cache['last_other'] = failure
+            if 'first_active' not in data_cache:
+                data_cache['first_active'] = self.parent.active_start
+        return True # always retry
+
 class Job(zc.async.utils.Base):
 
     zope.interface.implements(zc.async.interfaces.IJob)
@@ -82,7 +189,26 @@
     _status = zc.async.interfaces.NEW
     _begin_after = _begin_by = _active_start = _active_end = None
     key = None
-    
+    # default_retry_policy and retry_policy should either be name to adapt job
+    # to IRetries, or factory, or None.
+    default_retry_policy = ''
+    retry_policy = None
+    retries = None
+    default_error_log_level = logging.ERROR
+    error_log_level = None
+
+    @property
+    def effective_error_log_level(self):
+        if self.error_log_level is None:
+            return self.default_error_log_level
+        return self.error_log_level
+
+    @property
+    def effective_retry_policy(self):
+        if self.retry_policy is None:
+            return self.default_retry_policy
+        return self.retry_policy
+
     assignerUUID = None
     _quota_names = ()
 
@@ -227,7 +353,8 @@
     @rwproperty.setproperty
     def callable(self, value):
         # can't pickle/persist methods by default as of this writing, so we
-        # add the sugar ourselves
+        # add the sugar ourselves.  In future, would like for args to be
+        # potentially methods of persistent objects too...
         if self._status != zc.async.interfaces.NEW:
             raise zc.async.interfaces.BadStatusError(
                 'can only set callable when a job has NEW, PENDING, or '
@@ -243,12 +370,25 @@
         if zc.async.interfaces.IJob.providedBy(self._callable_root):
             self._callable_root.parent = self
 
-    def addCallbacks(self, success=None, failure=None):
+    def addCallbacks(self, success=None, failure=None,
+                     error_log_level=None, retry_policy=None):
         if success is not None or failure is not None:
             if success is not None:
                 success = zc.async.interfaces.IJob(success)
+                success.default_error_log_level = logging.CRITICAL
+                if error_log_level is not None:
+                    success.error_log_level = error_log_level
+                success.default_retry_policy = 'callback'
+                if retry_policy is not None:
+                    success.retry_policy = retry_policy
             if failure is not None:
                 failure = zc.async.interfaces.IJob(failure)
+                failure.default_error_log_level = logging.CRITICAL
+                if error_log_level is not None:
+                    failure.error_log_level = error_log_level
+                failure.default_retry_policy = 'callback'
+                if retry_policy is not None:
+                    failure.retry_policy = retry_policy
             res = Job(success_or_failure, success, failure)
             if success is not None:
                 success.parent = res
@@ -260,12 +400,18 @@
             abort_handler = zc.async.interfaces.IJob(
                 completeStartedJobArguments)
             abort_handler.args.append(res)
-            res.addCallback(abort_handler)
+            res.addCallback(abort_handler, error_log_level)
+            abort_handler.default_error_log_level = logging.CRITICAL
+            if error_log_level is not None:
+                abort_handler.error_log_level = error_log_level
+            abort_handler.default_retry_policy = 'callback'
+            if retry_policy is not None:
+                abort_handler.retry_policy = retry_policy
         else:
             res = self
         return res
 
-    def addCallback(self, callback):
+    def addCallback(self, callback, error_log_level=None, retry_policy=None):
         callback = zc.async.interfaces.IJob(callback)
         self.callbacks.put(callback)
         callback.parent = self
@@ -274,8 +420,39 @@
         else:
             self._p_changed = True # to try and fire conflict errors if
             # our reading of self.status has changed beneath us
+        callback.default_error_log_level = logging.CRITICAL
+        if error_log_level is not None:
+            callback.error_log_level = error_log_level
+        callback.default_retry_policy = 'callback'
+        if retry_policy is not None:
+            callback.retry_policy = retry_policy
         return callback
 
+    def _getRetry(self, call_name, tm, *args):
+        def getRetry():
+            retries = self.retries
+            if retries is None:
+                retry_policy = self.effective_retry_policy
+                if retry_policy is None:
+                    return None # means, do not retry ever
+                elif isinstance(retry_policy, basestring):
+                    retries = zope.component.getAdapter(
+                        self, zc.async.interfaces.IRetries,
+                        name=retry_policy)
+                else:
+                    retries = retry_policy(self)
+                if retries is not None:
+                    self.retries = retries
+            call = getattr(retries, call_name, None)
+            if call is None:
+                zc.async.utils.log.error(
+                    'retries %r for %r does not have required %s method',
+                    retries, self, call_name)
+                return None
+            return call(*args)
+        identifier = 'getting %s retry for %r' % (call_name, self)
+        return zc.async.utils.never_fail(getRetry, identifier, tm)
+
     def __call__(self, *args, **kwargs):
         if self.status not in (zc.async.interfaces.NEW,
                                zc.async.interfaces.ASSIGNED):
@@ -289,68 +466,146 @@
         effective_args[0:0] = self.args
         effective_kwargs = dict(self.kwargs)
         effective_kwargs.update(kwargs)
-        return self._call_with_retry(
-            lambda: self.callable(*effective_args, **effective_kwargs))
-
-    def _call_with_retry(self, call):
-        ct = 0
-        tm = transaction.interfaces.ITransactionManager(self)
+        # this is the calling code.  It is complex and long because it is
+        # trying both to handle exceptions reasonably, and to honor the
+        # IRetries interface for those exceptions.
+        data_cache = {}
         res = None
         while 1:
             try:
-                res = call()
-                if zc.async.interfaces.IJob.providedBy(res):
-                    res.addCallback(self._callback)
-                    tm.commit()
-                elif isinstance(res, twisted.internet.defer.Deferred):
-                    res.addBoth(zc.twist.Partial(self._callback))
-                    tm.commit()
-                else:
-                    res = self._complete(res, tm)
-            except ZODB.POSException.TransactionError:
+                res = self.callable(*effective_args, **effective_kwargs)
+            except zc.async.utils.EXPLOSIVE_ERRORS:
                 tm.abort()
-                ct += 1
-                if ct >= 5:
-                    res = self._complete(zc.twist.Failure(), tm)
-                    self.resumeCallbacks()
-                else:
+                raise
+            except:
+                res = zc.twist.Failure()
+                tm.abort()
+                retry = self._getRetry('jobError', tm, res, data_cache)
+                if isinstance(retry, (datetime.timedelta, datetime.datetime)):
+                    identifier = (
+                        'rescheduling %r as requested by '
+                        'associated IRetries %r' % (
+                            self, self.retries))
+                    if self is zc.async.utils.never_fail(
+                        lambda: self._reschedule(retry, data_cache),
+                        identifier, tm):
+                        return self
+                elif retry:
                     continue
-            except zc.twist.EXPLOSIVE_ERRORS:
+            try:
+                self._set_result(res)
+            except zc.async.utils.EXPLOSIVE_ERRORS:
                 tm.abort()
                 raise
             except:
+                if isinstance(res, twisted.python.failure.Failure):
+                    zc.async.utils.log.log(
+                        self.effective_error_log_level,
+                        'Commit failed for %r (see subsequent traceback).  '
+                        'Prior to this, job originally failed with '
+                        'traceback:\n%s',
+                        self,
+                        res.getTraceback(
+                            elideFrameworkCode=True, detail='verbose'))
+                else:
+                    zc.async.utils.tracelog.info(
+                        'Commit failed for %r (see subsequent traceback).  '
+                        'Prior to this, job succeeded with result: %r',
+                        self, res)
+                res = zc.twist.Failure()
                 tm.abort()
-                res = self._complete(zc.twist.Failure(), tm)
-                self.resumeCallbacks()
-            else:
-                if self._status == zc.async.interfaces.CALLBACKS:
-                    self.resumeCallbacks()
+                retry = self._getRetry('jobError', tm, res, data_cache)
+                if isinstance(retry, (datetime.timedelta, datetime.datetime)):
+                    identifier = (
+                        'rescheduling %r as requested by '
+                        'associated IRetries %r' % (
+                            self, self.retries))
+                    if self is zc.async.utils.never_fail(
+                        lambda: self._reschedule(retry, data_cache),
+                        identifier, tm):
+                        return self
+                elif retry:
+                    continue
+                # retries didn't exist or returned False
+                def complete():
+                    self._result = res
+                    self._status = zc.async.interfaces.CALLBACKS
+                    self._active_end = datetime.datetime.now(pytz.UTC)
+                    if self.retries is not None:
+                        self.retries.updateData(data_cache)
+                identifier = ('storing failure at commit of %r' % (self,))
+                zc.async.utils.never_fail(complete, identifier, tm)
+            self._complete(res)
             return res
 
-    def _callback(self, res):
-        self._call_with_retry(lambda: res)
+    def handleInterrupt(self):
+        # this is called either within a job (that has a never fail policy)
+        # or withing _resumeCallbacks (that uses never_fail)
+        if self.status is not zc.async.interfaces.ACTIVE:
+            raise zc.async.interfaces.BadStatusError(
+                'can only call ``handleInterrupt`` on a job with ACTIVE '
+                'status')
+        tm = transaction.interfaces.ITransactionManager(self)
+        retry = self._getRetry('interrupted', tm)
+        if retry:
+            if not isinstance(retry, (datetime.timedelta, datetime.datetime)):
+                retry = datetime.timedelta()
+            self._reschedule(retry)
+        else:
+            self.fail()
 
-    def _complete(self, res, tm):
-        if isinstance(res, twisted.python.failure.Failure):
-            res = zc.twist.sanitize(res)
-            failure = True
+    def _reschedule(self, retry, data_cache=None):
+        if not zc.async.interfaces.IAgent.providedBy(self.parent):
+            zc.async.utils.log.error(
+                'error for IRetries %r on %r: '
+                'can only reschedule a job directly in an agent',
+                self.retries, self)
+            return None
+        self._status = zc.async.interfaces.NEW
+        del self._active_start
+        if data_cache is not None and self.retries is not None:
+            self.retries.updateData(data_cache)
+        self.parent.reschedule(self, retry)
+        return self
+
+    def _set_result(self, res):
+        if zc.async.interfaces.IJob.providedBy(res):
+            res.addCallback(self._callback)
+        elif isinstance(res, twisted.internet.defer.Deferred):
+            res.addBoth(zc.twist.Partial(self._callback))
+            # XXX need to tell Partial to retry forever
         else:
-            failure = False
-        self._result = res
-        self._status = zc.async.interfaces.CALLBACKS
-        self._active_end = datetime.datetime.now(pytz.UTC)
+            if isinstance(res, twisted.python.failure.Failure):
+                res = zc.twist.sanitize(res)
+            self._result = res
+            self._status = zc.async.interfaces.CALLBACKS
+            self._active_end = datetime.datetime.now(pytz.UTC)
+        if self.retries is not None:
+            self.retries.updateData(data_cache)
         tm.commit()
-        if failure:
-            zc.async.utils.tracelog.error(
+
+    def _complete(self, res):
+        if isinstance(res, twisted.python.failure.Failure):
+            zc.async.utils.log.log(
+                self.effective_error_log_level,
                 '%r failed with traceback:\n%s',
                 self,
-                res.getTraceback(elideFrameworkCode=True, detail='verbose'))
+                res.getTraceback(
+                    elideFrameworkCode=True, detail='verbose'))
         else:
             zc.async.utils.tracelog.info(
-                '%r succeeded with result:\n%r',
+                '%r succeeded with result: %r',
                 self, res)
-        return res
+        self.resumeCallbacks()
 
+    def _callback(self, res):
+        # done within a job or partial, so we can rely on their retry bits to
+        # some degree.  However, we commit transactions ourselves, so we have
+        # to be a bit careful that the result hasn't been set already.
+        if self._status == zc.async.interfaces.ACTIVE:
+            self._set_result(res)
+        self._complete(res)
+
     def fail(self, e=None):
         if e is None:
             e = zc.async.interfaces.AbortedError()
@@ -359,14 +614,17 @@
             raise zc.async.interfaces.BadStatusError(
                 'can only call fail on a job with NEW, PENDING, ASSIGNED, or '
                 'ACTIVE status')
-        self._complete(zc.twist.Failure(e),
-                       transaction.interfaces.ITransactionManager(self))
-        self.resumeCallbacks()
+        self._complete(zc.twist.Failure(e))
 
     def resumeCallbacks(self):
         if self._status != zc.async.interfaces.CALLBACKS:
             raise zc.async.interfaces.BadStatusError(
                 'can only resumeCallbacks on a job with CALLBACKS status')
+        identifier = 'performing callbacks for %r' % (self,)
+        tm = transaction.interfaces.ITransactionManager(self)
+        zc.async.utils.never_fail(self._resumeCallbacks, identifier, tm)
+
+    def _resumeCallbacks(self):
         callbacks = list(self.callbacks)
         tm = transaction.interfaces.ITransactionManager(self)
         length = 0
@@ -379,27 +637,21 @@
                 elif j._status == zc.async.interfaces.ACTIVE:
                     zc.async.utils.tracelog.debug(
                         'failing aborted callback %r to %r', j, self)
-                    j.fail()
+                    j.handleInterrupt()
                 elif j._status == zc.async.interfaces.CALLBACKS:
                     j.resumeCallbacks()
                 # TODO: this shouldn't raise anything we want to catch, right?
                 # now, this should catch all the errors except EXPLOSIVE_ERRORS
                 # cleaning up dead jobs should look something like the above.
-            tm.commit()
             tm.begin() # syncs
             # it's possible that someone added some callbacks, so run until
             # we're exhausted.
             length += len(callbacks)
             callbacks = list(self.callbacks)[length:]
             if not callbacks:
-                try:
-                    self._status = zc.async.interfaces.COMPLETED
-                    if zc.async.interfaces.IAgent.providedBy(self.parent):
-                        self.parent.jobCompleted(self)
-                    tm.commit()
-                except ZODB.POSException.TransactionError:
-                    tm.abort()
-                    callbacks = list(self.callbacks)[length:]
-                else:
-                    break # and return
+                # this whole method is called within a never_fail...
+                self._status = zc.async.interfaces.COMPLETED
+                if zc.async.interfaces.IAgent.providedBy(self.parent):
+                    self.parent.jobCompleted(self)
+                tm.commit()
 

Modified: zc.async/branches/dev/src/zc/async/queue.py
===================================================================
--- zc.async/branches/dev/src/zc/async/queue.py	2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/queue.py	2008-05-20 01:08:43 UTC (rev 86848)
@@ -110,7 +110,7 @@
                         queue.put(job)
                         job.assignerUUID = tmp
                     elif job.status == zc.async.interfaces.ACTIVE:
-                        queue.put(job.fail)
+                        queue.put(job.handleInterrupt)
                     elif job.status == zc.async.interfaces.CALLBACKS:
                         queue.put(job.resumeCallbacks)
                     elif job.status == zc.async.interfaces.COMPLETED:
@@ -238,8 +238,13 @@
         self.dispatchers = Dispatchers()
         self.dispatchers.__parent__ = self
 
-    def put(self, item, begin_after=None, begin_by=None):
+    def put(self, item, begin_after=None, begin_by=None,
+            error_log_level=None, retry_policy=None):
         item = zc.async.interfaces.IJob(item)
+        if error_log_level is not None:
+            item.error_log_level = error_log_level
+        if retry_policy is not None:
+            item.retry_policy = retry_policy
         if item.assignerUUID is not None:
             raise ValueError(
                 'cannot add already-assigned job')

Modified: zc.async/branches/dev/src/zc/async/utils.py
===================================================================
--- zc.async/branches/dev/src/zc/async/utils.py	2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/utils.py	2008-05-20 01:08:43 UTC (rev 86848)
@@ -15,6 +15,8 @@
 import logging
 import sys
 
+import ZEO.Exceptions
+import ZODB.POSException
 import rwproperty
 import persistent
 import zc.dict
@@ -22,6 +24,15 @@
 import zope.bforest.periodic
 
 
+EXPLOSIVE_ERRORS = (SystemExit, KeyboardInterrupt)
+
+SYSTEM_ERRORS = (ZEO.Exceptions.ClientDisconnected,
+                 ZODB.POSException.POSKeyError)
+
+INITIAL_BACKOFF = 5
+MAX_BACKOFF = 60
+BACKOFF_INCREMENT = 5
+
 def simpleWrapper(name):
     # notice use of "simple" in function name!  A sure sign of trouble!
     def wrapper(self, *args, **kwargs):
@@ -236,3 +247,114 @@
 
     def __len__(self):
         return len(self._data)
+
+def never_fail(call, identifier, tm):
+    # forever for TransactionErrors; forever, with backoff, for anything else
+    trans_ct = 0
+    backoff_ct = 0
+    backoff = INITIAL_BACKOFF
+    res = None
+    while 1:
+        try:
+            res = call()
+            tm.commit()
+        except ZODB.POSException.TransactionError:
+            tm.abort()
+            trans_ct += 1
+            if not trans_ct % 5:
+                log.warning(
+                    '%d consecutive transaction errors while %s',
+                    ct, identifier, exc_info=True)
+                res = None
+        except EXPLOSIVE_ERRORS:
+            tm.abort()
+            raise
+        except Exception, e:
+            if isinstance(e, SYSTEM_ERRORS):
+                level = logging.ERROR
+            else:
+                level = logging.CRITICAL
+            tm.abort()
+            backoff_ct += 1
+            if backoff_ct == 1:
+                log.log(level,
+                        'first error while %s; will continue in %d seconds',
+                        identifier, backoff, exc_info=True)
+            elif not backoff_ct % 5:
+                
+                log.log(level,
+                        '%d consecutive errors while %s; '
+                        'will continue in %d seconds',
+                        backoff_ct, identifier, backoff, exc_info=True)
+            res = None
+            time.sleep(backoff)
+            backoff = min(MAX_BACKOFF, backoff + BACKOFF_INCREMENT)
+        else:
+            return res
+
+def wait_for_system_recovery(call, identifier, tm):
+    # forever for TransactionErrors; forever, with backoff, for SYSTEM_ERRORS
+    trans_ct = 0
+    backoff_ct = 0
+    backoff = INITIAL_BACKOFF
+    res = None
+    while 1:
+        try:
+            res = call()
+            tm.commit()
+        except ZODB.POSException.TransactionError:
+            tm.abort()
+            trans_ct += 1
+            if not trans_ct % 5:
+                log.warning(
+                    '%d consecutive transaction errors while %s',
+                    ct, identifier, exc_info=True)
+                res = None
+        except EXPLOSIVE_ERRORS:
+            tm.abort()
+            raise
+        except SYSTEM_ERRORS:
+            tm.abort()
+            backoff_ct += 1
+            if backoff_ct == 1:
+                log.error('first error while %s; will continue in %d seconds',
+                          identifier, backoff, exc_info=True)
+            elif not backoff_ct % 5:
+                
+                log.error('%d consecutive errors while %s; '
+                          'will continue in %d seconds',
+                          backoff_ct, identifier, backoff, exc_info=True)
+            res = None
+            time.sleep(backoff)
+            backoff = min(MAX_BACKOFF, backoff + BACKOFF_INCREMENT)
+        except:
+            log.error('Error while %s', identifier, exc_info=True)
+            tm.abort()
+            return zc.twist.Failure()
+        else:
+            return res
+
+def try_transaction_five_times(call, identifier, tm):
+    ct = 0
+    res = None
+    while 1:
+        try:
+            res = call()
+            tm.commit()
+        except ZODB.POSException.TransactionError:
+            tm.abort()
+            ct += 1
+            if ct >= 5:
+                log.error('Five consecutive transaction errors while %s',
+                          identifier, exc_info=True)
+                res = zc.twist.Failure()
+            else:
+                continue
+        except EXPLOSIVE_ERRORS:
+            tm.abort()
+            raise
+        except:
+            tm.abort()
+            log.error('Error while %s', identifier, exc_info=True)
+            res = zc.twist.Failure()
+        return res
\ No newline at end of file



More information about the Checkins mailing list