[Checkins] SVN: zc.async/trunk/ svn merge -r 86847:87583 svn+ssh://svn.zope.org/repos/main/zc.async/branches/dev

Gary Poster gary at zope.com
Thu Jun 19 21:18:18 EDT 2008


Log message for revision 87584:
  svn merge -r 86847:87583 svn+ssh://svn.zope.org/repos/main/zc.async/branches/dev

Changed:
  U   zc.async/trunk/buildout.cfg
  U   zc.async/trunk/setup.py
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/README.txt
  U   zc.async/trunk/src/zc/async/TODO.txt
  U   zc.async/trunk/src/zc/async/catastrophes.txt
  U   zc.async/trunk/src/zc/async/configure.py
  U   zc.async/trunk/src/zc/async/dispatcher.py
  U   zc.async/trunk/src/zc/async/dispatcher.txt
  U   zc.async/trunk/src/zc/async/interfaces.py
  U   zc.async/trunk/src/zc/async/job.py
  U   zc.async/trunk/src/zc/async/job.txt
  U   zc.async/trunk/src/zc/async/queue.py
  U   zc.async/trunk/src/zc/async/queue.txt
  U   zc.async/trunk/src/zc/async/testing.py
  U   zc.async/trunk/src/zc/async/utils.py

-=-
Modified: zc.async/trunk/buildout.cfg
===================================================================
--- zc.async/trunk/buildout.cfg	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/buildout.cfg	2008-06-20 01:18:18 UTC (rev 87584)
@@ -13,13 +13,13 @@
 [test]
 recipe = zc.recipe.testrunner
 eggs = zc.async
-defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1'.split()
+defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1 --auto-color'.split()
 working-directory = ${buildout:directory}
 
 [z3test]
 recipe = zc.recipe.testrunner
 eggs = zc.async [z3]
-defaults = "--tests-pattern z3tests --exit-with-status -1".split()
+defaults = "--tests-pattern z3tests --exit-with-status -1 --auto-color".split()
 
 
 [interpreter]

Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/setup.py	2008-06-20 01:18:18 UTC (rev 87584)
@@ -18,7 +18,6 @@
 try:
     import docutils
 except ImportError:
-    import warnings
     def validateReST(text):
         return ''
 else:
@@ -95,15 +94,16 @@
         'uuid',
         'zc.queue',
         'zc.dict>=1.2.1',
-        'zc.twist>=1.2',
+        'zc.twist>=1.3',
         'Twisted>=8.0.1', # 8.0 was setuptools compatible, 8.0.1 had bugfixes.
-        # note that Twisted builds with warnings, at least with py2.4.  It
+        # note that Twisted builds with warnings with py2.4.  It
         # seems to still build ok.
         'zope.bforest>=1.2',
         'zope.component',
         'zope.event',
         'zope.i18nmessageid',
         'zope.interface',
+        'zope.minmax',
         'zope.testing',
         'rwproperty',
         ],

Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-06-20 01:18:18 UTC (rev 87584)
@@ -1,6 +1,58 @@
 1.2 (unreleased)
 ================
 
+- made the log for finding an activated agent report the pertinent queue's oid
+  as an unpacked integer, rather than the packed string blob. Use
+  ``ZODB.utils.p64`` to convert back to an oid that the ZODB will recognize.
+
+- Bugfix: in failing a job, the job thought it was in its old agent, and the
+  ``fail`` call failed. This is now tested by the first example in new doctest
+  ``catastrophes.txt``.
+
+- jobs no longer default to a ``begin_by`` value of one hour after the
+  ``begin_after``.  The default now is no limit.
+
+- Made dispatcher much more robust to transaction errors and ZEO
+  ClientDisconnected errors.
+
+- Jobs now use an IRetryPolicy to decide what to do on failure within a job,
+  within the commit of the result, and if the job is interrupted.  This allows
+  support of transactional jobs, transactional jobs that critically must be
+  run to completion, and non-transactional jobs such as communicating with an
+  external service.
+
+- The default retry policy supports retries for ClientDisconnected errors,
+  transaction errors, and interruptions.
+
+- ``job.txt`` has been expanded significantly to show error handling and the
+  use of retry policies. New file ``catastrophes.txt`` shows handling of other
+  catastrophes, such as interruptions to polling.
+
+- job errors now go in the main zc.async.event log rather than in the
+  zc.async.trace log.  Successes continue to go in the trace log.
+
+- callback failures go to the main log as a CRITICAL error, by default.
+
+- ``handleInterrupt`` is the new protocol on jobs to inform them that they were
+  active in a dispatcher that is now dead. They either fail or reschedule,
+  depending on the associated IRetryPolicy for the job. If they reschedule,
+  this should either be a datetime or timedelta. The job calls the agent's
+  ``reschedule`` method. If the timedelta is empty or negative, or the datetime
+  is earlier than now, the job is put back in the queue with a new ``putBack``
+  method on the queue. This is intended to be the opposite of ``claim``. Jobs
+  put in the queue with ``putBack`` will be pulled out before any others.
+
+- convert to using zope.minmax rather than locally defined ``Atom``.
+
+- Fix (and simplify) last_ping code so as to reduce unnecessarily writing the
+  state of the parent DispatcherAgents collection to the database whenever the
+  atom changed.
+
+- Depends on new release of zc.twist (1.3)
+
+1.1.1 (2008-05-14)
+==================
+
 - more README tweaks.
 
 - converted all reports from the dispatcher, including the monitor output,
@@ -21,15 +73,6 @@
   Fixed, which also means we need a new version of zope.bforest (1.2) for a new
   feature there.
 
-- made the log for finding an activated agent report the pertinent queue's oid
-  as an unpacked integer, rather than the packed string blob.  As mentioned
-  above, use ``ZODB.utils.p64`` to convert back to an oid that the ZODB will
-  recognize.
-
-- Bugfix: in failing a job, the job thought it was in its old agent, and the
-  ``fail`` call failed. This is now tested by the first example in new doctest
-  ``catastrophes.txt``.
-
 1.1 (2008-04-24)
 ================
 
@@ -49,7 +92,7 @@
 
 - Had the ThreadedDispatcherInstaller subscriber stash the thread on the
   dispatcher, so you can shut down tests like this:
-  
+
   >>> import zc.async.dispatcher
   >>> dispatcher = zc.async.dispatcher.get()
   >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)

Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/README.txt	2008-06-20 01:18:18 UTC (rev 87584)
@@ -1087,15 +1087,16 @@
 
     >>> t = transaction.begin()
     >>> job = queue.put(
-    ...     send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
+    ...     send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC),
+    ...     datetime.timedelta(hours=1))
     >>> transaction.commit()
     >>> reactor.wait_for(job)
     >>> job.result
-    <zc.twist.Failure zc.async.interfaces.AbortedError>
+    <zc.twist.Failure zc.async.interfaces.TimeoutError>
     >>> import sys
     >>> job.result.printTraceback(sys.stdout) # doctest: +NORMALIZE_WHITESPACE
     Traceback (most recent call last):
-    Failure: zc.async.interfaces.AbortedError:
+    Failure: zc.async.interfaces.TimeoutError:
 
 .. [#job] The Job class can take arguments and keyword arguments
     for the wrapped callable at call time as well, similar to Python

Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/TODO.txt	2008-06-20 01:18:18 UTC (rev 87584)
@@ -1,15 +1,9 @@
-Bugs and improvements:
+For release
 
-- need retry tasks, particularly for callbacks
-
-- need CRITICAL logs for callbacks
-
-- when database went away, and then came back, async didn't come back.
-
 - be even more pessimistic about memory for saved polls and job info in
   dispatcher.
 
-For future versions:
+Bugs and improvements:
 
 - queues should be pluggable like agent with filter
 - show how to broadcast, maybe add conveniences
@@ -36,10 +30,17 @@
     Notice that all of the tests in this package use FileStorage.
   * callbacks should be very, very quick, and very reliable.  If you want to do
     something that might take a while, put another job in the queue
-  
 
+More docs:
+
+- custom retry policies, particularly for non-transactional tasks;
+
+- changing the default retry policy, per-process and per-agent; and
+
+- changing the default log level for queue jobs, callback jobs, and per-job.
+
+
 For some other package, maybe:
 
 - TTW Management and logging views, as in zasync (see goals in the "History"
   section of the README).
-  
\ No newline at end of file

Modified: zc.async/trunk/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes.txt	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/catastrophes.txt	2008-06-20 01:18:18 UTC (rev 87584)
@@ -1,59 +1,485 @@
-Catastrophes
-============
+Recovering from Catastrophes
+============================
 
-Sometimes bad things happen in the course of processing tasks.  You might have
-a MemoryError while processing your main job, or some other failure might
-happen.  That's bad enough.  Of course, you can register some callbacks to
-handle the error, to do what you need to recover.
+------------
+Introduction
+------------
 
-But then what if the callback itself fails? Perhaps the situation that caused
-the main job to fail with a MemoryError will let the callback start, but not
-complete.  Then when a sibling dispatcher handles the incomplete job, the
-callback will fail.
+Sometimes bad things happen in the course of processing tasks. What might go
+wrong? How does zc.async handle these errors? What are your responsibilities?
 
-You, the user, do have some responsibilities. Callbacks should be very fast and
-light. If you want to do something that takes a long time, or might take a long
-time, have your callback put a new job in a queue for the long job. The
-callback itself should then complete, quickly out of the way. 
+First, what might go wrong?
 
-However, zc.async also has important responsibilities.
+- zc.async could have a problem while polling for jobs.  We'll call this a
+  "polling exception."
 
-This document examines catastrophes like the ones outlined above, to show how
-zc.async handles them, and how you can configure zc.async for these situations.
-Other documents in zc.async, such as the "Dead Dispatchers" section of
-queue.txt, look at this with some isolation and stubs; this document uses a
-complete zc.async set up to examine the system holistically [#setUp]_.
+- zc.async could have a problem while performing a particular job.  We'll call
+  this a "job-related exception."
 
-These are the scenarios we'll contemplate:
+For the purpose of this discussion, we will omit the possibility that zc.async
+has a bug. That is certainly a possibility, but the recovery story is not
+predictable, and if we knew of a bug, we'd try to fix it, rather than discuss
+it here!
 
-- The system has a single dispatcher. The dispatcher is working on a job with a
-  callback. The dispatcher dies, and then restarts, cleaning up.  We'll do two
-  variants, one with a graceful shutdown and one with a hard crash.
+We'll discuss both polling exceptions and job related exceptions, then drill
+down into some specific scenarios. This will illuminate how your code and
+zc.async's can work together to handle them.
 
-- The system has two dispatchers. One dispatcher is working on a job with a
-  callback, and then dies. The other dispatcher cleans up.
+Polling Exceptions
+------------------
 
-- The system has a single dispatcher.  The dispatcher is working on a job, and
-  successfully completes it.  The callback begins, and then fails.
+Polling exceptions are, at least in theory, the least of your worries. You
+shouldn't have to worry about them; and if you do, it is probably a basic
+configuration problem that you need to address, such as making sure that the
+dispatcher process has access to the needed databases and software; or making
+sure that the dispatcher process is run by a daemonizing software that will
+restart if needed, such as zdaemon (http://pypi.python.org/pypi/zdaemon) or
+supervisor (http://supervisord.org/).
 
-- The system has a single dispatcher.  The dispatcher is working on a job, and
-  successfully completes it.  The callback begins, and then the dispatcher
-  dies.
+zc.async is largely responsible for dealing with polling exceptions. What does
+it have to handle?
 
-- The system has a single dispatcher.  The database goes away, and then comes
-  back.
+- The process running the poll ends, perhaps in the middle of a poll.
 
--------------------------------------------------
-Dispatcher Dies Gracefully While Performing a Job
--------------------------------------------------
+- zc.async cannot commit a transaction during the poll, for instance because of
+  a ConflictError, or because the database is unavailable.
 
+What needs to happen to handle these problems?
+
+Process Ends
+............
+
+If the process ends, your daemonizing front-end (zdaemon, supervisor, etc.)
+needs to restart it. The ZODB will discard incomplete transaction data, if any.
+
+The only thing a zc.async dispatcher needs to handle is clean up.
+
+- Ideally it will be able to deactivate its record in the ZODB during the
+  process shutdown.
+
+- Instead, if it was a "hard crash" that didn't allow deactivation, a sibling
+  dispatcher will realize that the dispatcher is down and deactivate it.
+
+- Or, finally, if it was a hard crash without a sibling, and the daemon
+  restarts a process for the original dispatcher instance, the new process
+  needs to realize that the old process is dead, not competing with it.
+
+Transaction Error
+.................
+
+If the poll gets a conflict error, it should simply abort and retry the poll,
+forever, with a small back-off.
+
+If the database goes away (perhaps the ZEO server goes down for a bit, and the
+ZEO client to which the dispatcher is connected is trying to reconnect) it
+should gracefully try to wait for the database to return, and resume when it
+does.
+
+Other, more dramatic errors, such as POSKey errors, are generally considered to
+be out of zc.async's domain and control. It should ideally continue to try to
+resume as long as the process is alive, in case somehow the situation improves,
+but this may be difficult and the expectations for zc.async's recovery are
+lower than with ConflictErrors and ClientDisconnected errors.
+
+Summary of Polling Exceptions
+.............................
+
+To repeat, then, polling exceptions have two basic scenarios.
+
+If a dispatcher process ends, it needs to deactivate its record in the ZODB, or
+let another process know to deactivate it.
+
+If a ZODB.POSException.ConflictError occurs, retry forever with a small
+backoff; or if ZEO.Exceptions.ClientDisconnected occurs, retry forever with a
+small backoff, waiting for the database to come back.
+
+Most anything else will ideally keep zc.async attempting to re-poll, but it may
+not happen: expectations are lower.
+
+Job-Related Exceptions
+----------------------
+
+What about job-related exceptions? Responsibility for handling job-related
+exceptions is shared between your code and zc.async's.  What might happen?
+
+- Your job might fail internally.
+
+- The process running your task ends before completing your task.
+
+- zc.async cannot commit a transaction after your task completes, for instance
+  because of a ConflictError, or because the database is unavailable.
+
+What should occur to handle these problems?
+
+Job Fails
+.........
+
+As discussed elsewhere, if your job fails in your own code, this is mostly
+your reponsibility. You should handle possible errors both within your job's
+code, and in callbacks, as appropriate.
+
+The other tool at your disposal for this situation, as with others below, is a
+retry policy. Retry policies let you determine what zc.async should do when
+your job fails. The default retry policy for job failures (as well as commit
+failures, below) is that transaction errors, such as conflict errors, are
+retried five times, and a ZEO ClientDisconnected error is retried forever with
+a backoff. You can customize these.
+
+Other than supporting these tools, zc.async's only other responsibilities are
+to report.
+
+By default, zc.async will log a failure of a job entered in a queue at the
+"ERROR" level in the ``zc.async.events`` log, and it will log a failure of a
+callback or other internal job at the "CRITICAL" level. This can be controlled
+per-process and per-job, as we'll see below. These tracebacks include
+information about the local and global variables for each frame in the stack,
+which can be useful to deduce the problem that occurred.
+
+zc.async also includes a ``Failure`` object on the job as a result, to let you
+react to the problem in a callback, and analyze it later.  This is discussed in
+detail in other documents.
+
+Process Ends
+............
+
+If a process ends while it is performing a job, that is similar, in large part,
+to the possibility of the process ending the polling job: we need to restart
+the process, and realize that we had started the job. But should we restart the
+job, or abort it?
+
+Answering this question is a matter of policy, and requires knowing what each
+job does.
+
+Generally, if a job is fully transactional, such as writing something to the
+ZODB, and the job has not timed out yet, you'll want to restart it. You might
+want to restart some reasonably large number of times, and then suspect that,
+since you can't seem to finish the job, maybe the job is causing the process to
+die, and you should abort.  Or perhaps you want to restart for ever.
+
+If the job isn't transactional, such as communicating with an external service,
+you might want to abort the job, and set up some callbacks to handle the
+fallout.
+
+As we'll see below, zc.async defaults to guessing that jobs placed directly in
+a queue are transactional, and can be tried up to ten times; and that jobs
+used as callbacks are also transactional, and can be tried until they
+succeed.  The defaults can be changed and the behavior of an individual
+job can be changed.
+
+These settings are controlled with a RetryPolicy, discussed below.
+
+Transaction Error
+.................
+
+Handling transaction errors after processing a job is also similar to the
+handling of transaction errors for polling exceptions. ConflictErrors and
+ClientDisconnected errors should often cause jobs to be aborted and restarted.
+However, if the job is not transactional, such as communicating with an
+external service, a simple abourt and retry may be hazardous. Also, many jobs
+should be stopped if they retry on ConflictError more than some number of
+times--a heuristic bellweather--with the logic that they may be simply doing
+something too problematic, and they are blocking other tasks from starting. But
+other jobs should be retried until they complete.
+
+As mentioned above, zc.async defaults to guessing that jobs are transactional.
+Client Disconnected errors are retried forever, with a small backoff. Jobs
+placed in a queue retry transaction errors, such as ConflictErrors, four times,
+while callbacks retry them forever. The defaults can be changed and the
+behavior of an individual job can be changed, using the RetryPolicy described
+below.
+
+Summary of Job-Related Exceptions
+.................................
+
+If an exception occurs in your job's code, zc.async will log it as an ERROR
+if a main queue job and as CRITICAL if it is a callback; and it will make the
+result of the call a ``Failure`` with error information, as shown elsewhere.
+Everything else is your responsibility, to be handled with try:except or
+try:finally blocks in your code, callbacks, or custom RetryPolicies.
+
+Process death, conflict errors, and ``ClientDisconnected`` errors all may need
+to be handled differently for different jobs. zc.async has a default policy for
+jobs placed in a queue, and for callback jobs. The default policy, a
+RetryPolicy, can be changed and can be set explicitly per-job.
+
+Your Responsibilities
+---------------------
+
+As the author of a zc.async job, your responsibilities, then, are to handle
+your own exceptions; and to make sure that the retry policy for each job is
+appropriate.  This is controlled with an IRetryPolicy, as shown below.
+
+As someone configuring a running dispatcher, you need to make sure that you
+give the dispatcher the necessary access to databases and software to perform
+your jobs, and you need to review (and rotate!) your logs.
+
+zc.async's Responsibilities
+---------------------------
+
+zc.async needs to have polling robust in the face of restarts, ConflictErrors
+and ClientDisconnected errors. It needs to give your code a chance to decide
+what to do in these circumstances, and log your errors.
+
+Retry Policies
+--------------
+
+The rest of the document uses scenarios to illustrate how zc.async handles
+errors, and how you might want to configure retry policies.
+
+What is a retry policy?  It is used in three circumstances.
+
+- When the job starts but fails to complete because the system is interrupted,
+  the job will try to call ``retry_policy.interrupted()`` to get a boolean as
+  to whether the job should be retried.
+
+- When the code the job ran fails, the job will try to call
+  ``retry_policy.jobError(failure, data_cache)`` to get a boolean as to whether
+  the job should be retried.
+
+- When the commit fails, the job will try to call
+  ``retry_policy.commitError(failure, data_cache)`` to get a boolean as to
+  whether the job should be retried.
+
+Why does this need to be a policy?  Can't it be a simpler arrangement?
+
+The heart of the problem is that different jobs need different error
+resolutions.
+
+In some cases, jobs may not be fully transactional.  For instance, the job
+may be communicating with an external system, such as a credit card system.
+The retry policy here should typically be "never": perhaps a callback should be
+in charge of determining what to do next.
+
+If a job is fully transactional, it can be retried.  But even then the desired
+behavior may differ.
+
+- In typical cases, some errors should simply cause a failure, while other
+  errors, such as database conflict errors, should cause a limited number of
+  retries.
+
+- In some jobs, conflict errors should be retried forever, because the job must
+  be run to completion or else the system should fall over. Callbacks that try
+  to handle errors themselves may take this approach, for instance.
+
+zc.async currently ships with three retry policies.
+
+1.  The default, appropriate for most fully transactional jobs, is the
+    zc.async.job.RetryCommonFourTimes.
+
+2.  The other available (pre-written) option for transactional jobs is
+    zc.async.job.RetryCommonForever. Callbacks will get this policy by
+    default.
+
+Both of these policies retry ZEO disconnects forever; and interrupts and
+transaction errors such as conflicts either four times (for a total of five
+attempts) or forever, respectively.
+
+3.  The last retry policy is zc.async.job.NeverRetry.  This is appropriate for
+    non-transactional jobs. You'll still typically need to handle errors in
+    your callbacks.
+
+Scenarios
+---------
+
+We'll examine polling error scenarios and job error scenarios.
+
+- Polling errors
+
+  * The system is polling and gets a ConflictError.
+
+  * The system is polling and gets a ClientDisconnected error.
+
+- Job errors
+
+  * A worker process is working on a job with the default retry policy. The
+    process dies gracefully and restarts.
+
+  * Like the previous scenario, a worker process is working on a job with the
+    default retry policy. The process crashes hard (does not die gracefully)
+    and restarts.
+
+  * Like the previous scenario, a worker process is working on a job with the
+    default retry policy. The process crashes hard (does not die gracefully)
+    and a sibling notices and takes over.
+
+  * A worker process is working on a job with the default retry policy and gets
+    an error during the job or the commit.
+
+-------------------------
+Scenarios: Polling Errors
+-------------------------
+
+ConflictError
+-------------
+
+A common place for a conflict error is with two dispatchers trying to claim the
+same job from the queue.  This example will mimic that situation.
+
+Imagine we have a full set up with a dispatcher, agent, and queue. [#setUp]_
+We'll actually replace the agent's chooser with one that behaves badly: it
+blocks, waiting for our lock.
+
+    >>> import threading
+    >>> lock1 = threading.Lock()
+    >>> lock2 = threading.Lock()
+    >>> lock1.acquire()
+    True
+    >>> lock2.acquire()
+    True
+    >>> def acquireLockAndChooseFirst(agent):
+    ...     res = agent.queue.claim()
+    ...     if res is not None:
+    ...         lock2.release()
+    ...         lock1.acquire()
+    ...     return res
+    ...
+    >>> import zc.async.instanceuuid
+    >>> import zc.async.interfaces
+    >>> import zc.async.testing
+    >>> import zc.async.dispatcher
+    >>> import pprint
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> pprint.pprint(zc.async.testing.get_poll(dispatcher, 0))
+    {'': {'main': {'active jobs': [],
+                   'error': None,
+                   'len': 0,
+                   'new jobs': [],
+                   'size': 3}}}
+    >>> import transaction
+    >>> _ = transaction.begin()
+    >>> queues = root[zc.async.interfaces.KEY]
+    >>> queue = queues['']
+    >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+    >>> agent = da['main']
+    >>> agent.chooser = acquireLockAndChooseFirst
+    >>> def returnSomething():
+    ...     return 42
+    ...
+    >>> job = queue.put(returnSomething)
+    >>> transaction.commit()
+
+Now, when the agent tries to get our job, we'll start and commit another
+transaction that removes it from the queue.  This will generate a conflict
+error for the poll's thread and transaction, because it cannot also remove the
+same job.
+
+    >>> lock2.acquire()
+    True
+    >>> _ = transaction.begin()
+    >>> job is queue.pull()
+    True
+    >>> transaction.commit()
+    >>> lock1.release()
+
+However, the ConflictError is handled, and polling continues.
+
+    >>> _ = transaction.begin()
+    >>> import zc.async.agent
+    >>> agent.chooser = zc.async.agent.chooseFirst
+    >>> transaction.commit()
+    >>> import zc.async.dispatcher
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> import zc.async.testing
+    >>> pprint.pprint(zc.async.testing.get_poll(dispatcher))
+    {'': {'main': {'active jobs': [],
+                   'error': None,
+                   'len': 0,
+                   'new jobs': [],
+                   'size': 3}}}
+
+And if we put the job back, it will be performed.
+
+    >>> job is queue.put(job)
+    True
+    >>> transaction.commit()
+    >>> zc.async.testing.wait_for_result(job)
+    42
+
+Client Disconnected
+-------------------
+
+The story is very similar if the ZEO connection goes away for a while.  We'll
+mimic a ZEO ClientDisconnected error by monkeypatching
+transaction.TranasctionManager.commit.
+
+    >>> lock1.locked()
+    True
+    >>> lock2.locked()
+    True
+
+    >>> agent.chooser = acquireLockAndChooseFirst
+    >>> job = queue.put(returnSomething)
+    >>> transaction.commit()
+
+    >>> lock2.acquire()
+    True
+    >>> import ZEO.Exceptions
+    >>> def commit(self):
+    ...     raise ZEO.Exceptions.ClientDisconnected()
+    ...
+    >>> import transaction
+    >>> old_commit = transaction.TransactionManager.commit
+    >>> transaction.TransactionManager.commit = commit
+    >>> import time
+    >>> sleep_requests = []
+    >>> def sleep(i):
+    ...     sleep_requests.append(i)
+    ...
+    >>> old_sleep = time.sleep
+    >>> time.sleep = sleep
+    >>> agent.chooser = zc.async.agent.chooseFirst
+    >>> transaction.commit()
+    >>> lock1.release()
+    >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
+    {'': {'main': {'active jobs': [],
+                   'error': None,
+                   'len': 0,
+                   'new jobs': [(..., 'unnamed')],
+                   'size': 3}}}
+    >>> transaction.TransactionManager.commit = old_commit
+    >>> zc.async.testing.wait_for_result(job)
+    42
+    >>> bool(sleep_requests)
+    True
+
+Here's another variant that mimics being unable to read the storage during a
+poll, and then recuperating.
+
+    >>> error_raised = False
+    >>> def raiseDisconnectedThenChooseFirst(agent):
+    ...     global error_raised
+    ...     if not error_raised:
+    ...         error_raised = True
+    ...         raise ZEO.Exceptions.ClientDisconnected()
+    ...     return agent.queue.claim()
+    >>> agent.chooser = raiseDisconnectedThenChooseFirst
+    >>> def returnSomething():
+    ...     return 42
+    ...
+    >>> job = queue.put(returnSomething)
+    >>> transaction.commit()
+    >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
+    {'': {'main': {'active jobs': [],
+                   'error': <zc.twist.Failure ...ClientDisconnected>,
+                   'len': 0,
+                   'new jobs': [],
+                   'size': 3}}}
+    >>> zc.async.testing.wait_for_result(job)
+    42
+
+-----------------------------
+Scenarios: Job-Related Errors
+-----------------------------
+
+Graceful Shutdown During Job
+----------------------------
+
 First let's consider how a failed job with a callback or two is handled when
 the dispatcher dies.
 
 Here we start a job.
 
     >>> import zope.component
-    >>> import threading
     >>> import transaction
     >>> import zc.async.interfaces
     >>> import zc.async.testing
@@ -63,29 +489,35 @@
     >>> lock = threading.Lock()
     >>> lock.acquire()
     True
+    >>> fail_flag = True
     >>> def wait_for_me():
-    ...     lock.acquire()
-    ...     lock.release() # so we can use the same lock again later
-    ...     raise SystemExit() # this will cause the worker thread to exit
+    ...     global fail_flag
+    ...     if fail_flag:
+    ...         fail_flag = False
+    ...         lock.acquire()
+    ...         lock.release() # so we can use the same lock again later
+    ...         raise SystemExit() # this will cause the worker thread to exit
+    ...     else:
+    ...         return 42
     ...
-    >>> def handle_error(result):
-    ...     return '...I handled the error...'
+    >>> def handle_result(result):
+    ...     return 'I got result %r' % (result,)
     ...
     >>> job = queue.put(wait_for_me)
-    >>> callback_job = job.addCallbacks(failure=handle_error)
+    >>> callback_job = job.addCallback(handle_result)
     >>> transaction.commit()
     >>> dispatcher = zc.async.dispatcher.get()
     >>> poll = zc.async.testing.get_poll(dispatcher)
     >>> wait_for_start(job)
 
-In this scenario, ``wait_for_me`` is a job that will "unexpectedly" be lost
-while the dispatcher stops working.  ``handle_error`` is the hypothetical
-handler that should be called if the ``wait_for_me`` job fails.
+In this scenario, ``wait_for_me`` is a job that, the first time it is run, will
+"unexpectedly" be lost while the dispatcher stops working. ``handle_result``
+will simply show us that callbacks will be called successfully.
 
 The job has started. Now, the dispatcher suddenly dies without the thread
 performing ``wait_for_me`` getting a chance to finish. For our first example,
 let's give the dispatcher a graceful exit. The dispatcher gets a chance to
-clean up its dispatcher agents, and job.fail() goes into the queue.
+clean up its dispatcher agents, and job.handleInterrupt() goes into the queue.
 
     >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
     >>> wait_to_deactivate(dispatcher)
@@ -94,37 +526,44 @@
     True
     >>> len(queue)
     1
-    >>> fail_job = queue[0]
-    >>> fail_job
-    <zc.async.job.Job (oid 51, db 'unnamed') ``zc.async.job.Job (oid 30, db 'unnamed') :fail()``>
-    >>> queue[0].callable
-    <bound method Job.fail of <zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.wait_for_me()``>>
+    >>> interrupt_job = queue[0]
+    >>> interrupt_job # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+    >>> queue[0].callable # doctest: +ELLIPSIS
+    <bound method Job.handleInterrupt of <...Job ... ``...wait_for_me()``>>
 
-Now when the process starts back up again, our callback will be performed.
+Now when the process starts back up again, ``handleInterrupt`` checks with the
+default retry policy as to what should be done. It requests that the job be
+retried. It's put back in the queue, and it is called again normally.
 
     >>> old_dispatcher = dispatcher
     >>> zc.async.dispatcher.clear()
     >>> zc.async.subscribers.ThreadedDispatcherInstaller(
     ...         poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
     >>> dispatcher = zc.async.dispatcher.get()
-    >>> zc.async.testing.wait_for_result(fail_job)
-    >>> job.status == zc.async.interfaces.COMPLETED
-    True
-    >>> job.result
-    <zc.twist.Failure zc.async.interfaces.AbortedError>
+    >>> zc.async.testing.wait_for_result(interrupt_job)
+
+Now we need to wait for the job.
+
+    >>> zc.async.testing.wait_for_result(job)
+    42
     >>> callback_job.status == zc.async.interfaces.COMPLETED
     True
     >>> callback_job.result
-    '...I handled the error...'
+    'I got result 42'
 
-So, our callback had a chance to do whatever it thought appropriate--in this
-case, simply returning a string--once the dispatcher got back online
-[#cleanup1]_.
+The job now has a retry policy with some currently non-interface values that
+are still worth showing here.
 
-------------------------------------------------
-Dispatcher Crashes "Hard" While Performing a Job
-------------------------------------------------
+    >>> policy = job.getRetryPolicy()
+    >>> policy.data.get('interruptions')
+    1
 
+This shows that the policy registered one interruption. [#cleanup1]_
+
+Hard Crash During Job
+---------------------
+
 Our next catastrophe only changes one aspect to the previous one: the
 dispatcher does not stop gracefully, and does not have a chance to clean up its
 active jobs.  It is a "hard" crash.
@@ -136,8 +575,9 @@
 
     >>> lock.acquire()
     True
+    >>> fail_flag = True
     >>> job = queue.put(wait_for_me)
-    >>> callback_job = job.addCallbacks(failure=handle_error)
+    >>> callback_job = job.addCallback(handle_result)
     >>> transaction.commit()
     >>> dispatcher = zc.async.dispatcher.get()
     >>> poll = zc.async.testing.get_poll(dispatcher)
@@ -156,11 +596,12 @@
 where ``UUID`` is the uuid of that dispatcher.
 
 The ``DispatcherAgents`` object has four pertinent attributes:
-``ping_interval``, ``ping_death_interval``, ``last_ping``, and ``dead``. About
-every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is supposed
-to write a ``datetime`` to ``last_ping``. If the ``last_ping`` plus the
-``ping_death_interval`` (also a ``timedelta``) is older than now, the
-dispatcher is considered to be ``dead``, and old jobs should be cleaned up.
+``ping_interval``, ``ping_death_interval``, ``last_ping.value``, and ``dead``.
+About every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is
+supposed to write a ``datetime`` to ``last_ping.value``. If the
+``last_ping.value`` plus the ``ping_death_interval`` (also a ``timedelta``) is
+older than now, the dispatcher is considered to be ``dead``, and old jobs
+should be cleaned up.
 
 The ``ping_interval`` defaults to 30 seconds, and the ``ping_death_interval``
 defaults to 60 seconds. Generally, the ``ping_death_interval`` should be at
@@ -221,13 +662,16 @@
     ``zc.async.dispatcher.get().activated = False``.  To stop polls
     permanently, don't start a zc.async.dispatcher!)
 
-
 To speed up the realization of our dispatcher that the previous activation is
 ``dead``, we'll set the ping_death_interval to just one second.
 
     >>> _ = transaction.begin()
     >>> da.dead
     False
+    >>> job in da['main']
+    True
+    >>> len(queue)
+    0
     >>> import datetime
     >>> da.ping_death_interval = datetime.timedelta(seconds=1)
     >>> da.dead
@@ -237,41 +681,48 @@
     >>> transaction.commit()
 
 After the next poll, the dispatcher will have cleaned up its old tasks in the
-same way we saw in the previous example. The job's ``fail`` method will be
-called, and the callback will be performed.  The DispatcherAgents object is
-no longer dead, because it is tied to the new instance of the dispatcher.
+same way we saw in the previous example. The job's ``handleInterrupt`` method
+will be called, and the job will be put back in the queue to be retried. The
+DispatcherAgents object is no longer dead, because it is tied to the new
+instance of the dispatcher.
 
     >>> poll = zc.async.testing.get_poll(dispatcher)
-    >>> _ = transaction.begin()
+    >>> def wait_for_pending(job):
+    ...     for i in range(60):
+    ...         t = transaction.begin()
+    ...         if job.status in (zc.async.interfaces.PENDING):
+    ...             break
+    ...         time_sleep(0.1)
+    ...     else:
+    ...         assert False, 'job never pending'
+    >>> wait_for_pending(job)
     >>> job in da['main']
     False
     >>> bool(da.activated)
     True
     >>> da.dead
     False
-    >>> fail_job = job.parent
-    >>> fail_job
-    <zc.async.job.Job (oid 78, db 'unnamed') ``zc.async.job.Job (oid 57, db 'unnamed') :fail()``>
+    >>> queue[0] is job
+    True
 
-Let's see it happen.
+Now we need to wait for the job.
 
-    >>> zc.async.testing.wait_for_result(fail_job)
-    >>> job.status == zc.async.interfaces.COMPLETED
-    True
-    >>> job.result
-    <zc.twist.Failure zc.async.interfaces.AbortedError>
+    >>> zc.async.testing.wait_for_result(job)
+    42
     >>> callback_job.status == zc.async.interfaces.COMPLETED
     True
     >>> callback_job.result
-    '...I handled the error...'
+    'I got result 42'
+    >>> policy = job.getRetryPolicy()
+    >>> policy.data.get('interruptions')
+    1
 
 The dispatcher cleaned up its own "hard" crash.
 
 [#cleanup2]_
 
------------------------------------------------------------------
-Dispatcher Crashes "Hard" While Performing a Job, Sibling Resumes
------------------------------------------------------------------
+Hard Crash During Job with Sibling Recovery
+-------------------------------------------
 
 Our next catastrophe is the same as the one before, except, after one
 dispatcher's hard crash, another dispatcher is around to clean up the dead
@@ -285,8 +736,9 @@
 
     >>> lock.acquire()
     True
+    >>> fail_flag = True
     >>> job = queue.put(wait_for_me)
-    >>> callback_job = job.addCallbacks(failure=handle_error)
+    >>> callback_job = job.addCallback(handle_result)
     >>> transaction.commit()
     >>> dispatcher = zc.async.dispatcher.get()
     >>> poll = zc.async.testing.get_poll(dispatcher)
@@ -359,8 +811,8 @@
 
 After the second dispatcher gets a poll--a chance to notice--it will have
 cleaned up the first dispatcher's old tasks in the same way we saw in the
-previous example.  The job's ``fail`` method will be called, and the callback
-will be performed.
+previous example.  The job's ``handleInterrupt`` method will be called, which
+in this case will put it back in the queue to be claimed and performed.
 
     >>> alt_poll_3 = zc.async.testing.get_poll(alt_dispatcher)
     >>> _ = transaction.begin()
@@ -370,48 +822,31 @@
     False
     >>> da.dead
     True
-    >>> fail_job = job.parent
-    >>> fail_job
-    <zc.async.job.Job (oid 121, db 'unnamed') ``zc.async.job.Job (oid 84, db 'unnamed') :fail()``>
-    >>> zc.async.testing.wait_for_result(fail_job)
-    >>> job.status == zc.async.interfaces.COMPLETED
+    >>> wait_for_pending(job)
+    >>> queue[0] is job
     True
-    >>> job.result
-    <zc.twist.Failure zc.async.interfaces.AbortedError>
+
+Now we need to wait for the job.
+
+    >>> zc.async.testing.wait_for_result(job)
+    42
     >>> callback_job.status == zc.async.interfaces.COMPLETED
     True
     >>> callback_job.result
-    '...I handled the error...'
+    'I got result 42'
 
 The sibling, then, was able to clean up the mess left by the "hard" crash of
 the first dispatcher.
 
 [#cleanup3]_
 
---------------
-Callback Fails
---------------
+Other Job-Related Errors
+------------------------
 
--------------------------------
-Dispatcher Dies During Callback
--------------------------------
+Other problems--errors when performing or committing jobs--are handled within
+jobs, getting the decisions from retry policies as described above.  These
+are demonstrated in the job.txt document.
 
-------------------------------
-Database Disappears For Awhile
-------------------------------
-
----------------------------------------------
-Other Catastrophes, And Your Responsibilities
----------------------------------------------
-
-There are some catastrophes from which there are no easy fixes like these.  For
-instance, imagine you have communicated with an external system, and gotten a
-reply that you have successfully made a transaction there, but then the
-dispatcher dies, or the database disappears, before you have a chance to commit
-the local transaction recording the success.  Your code needs to see that
-
-...multidatabase...
-
 .. ......... ..
 .. Footnotes ..
 .. ......... ..
@@ -421,8 +856,8 @@
     >>> import ZODB.FileStorage
     >>> storage = ZODB.FileStorage.FileStorage(
     ...     'main.fs', create=True)
-    >>> from ZODB.DB import DB 
-    >>> db = DB(storage) 
+    >>> from ZODB.DB import DB
+    >>> db = DB(storage)
     >>> conn = db.open()
     >>> root = conn.root()
     >>> import zc.async.configure
@@ -440,13 +875,14 @@
     >>> import transaction
     >>> _ = transaction.begin()
 
-    >>> import time
+    >>> from time import sleep as time_sleep # we import in this manner so
+    ... # that our testing monkey-patch of time.sleep does not affect our tests
     >>> def wait_for_start(job):
     ...     for i in range(60):
     ...         t = transaction.begin()
     ...         if job.status == zc.async.interfaces.ACTIVE:
     ...             break
-    ...         time.sleep(0.1)
+    ...         time_sleep(0.1)
     ...     else:
     ...         assert False, 'job never started'
 
@@ -454,7 +890,7 @@
     ...     for i in range(60):
     ...         if dispatcher.activated == False:
     ...             break
-    ...         time.sleep(0.1)
+    ...         time_sleep(0.1)
     ...     else:
     ...         assert False, 'dispatcher never deactivated'
 
@@ -490,3 +926,4 @@
     >>> alt_dispatcher.reactor.callFromThread(alt_dispatcher.reactor.stop)
     >>> alt_dispatcher.thread.join(3)
     >>> alt_dispatcher.dead_pools[0].threads[0].join(3)
+    >>> time.sleep = old_sleep

Modified: zc.async/trunk/src/zc/async/configure.py
===================================================================
--- zc.async/trunk/src/zc/async/configure.py	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/configure.py	2008-06-20 01:18:18 UTC (rev 87584)
@@ -56,4 +56,4 @@
 def base():
     # see comment in ``minimal``, above
     minimal()
-    zope.component.provideAdapter(zc.twist.connection)
\ No newline at end of file
+    zope.component.provideAdapter(zc.twist.connection)

Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/dispatcher.py	2008-06-20 01:18:18 UTC (rev 87584)
@@ -21,6 +21,7 @@
 import twisted.python.failure
 import twisted.internet.defer
 import ZODB.POSException
+import ZEO.Exceptions
 import ZODB.utils
 import BTrees
 import transaction
@@ -55,7 +56,7 @@
 
     def __init__(self):
         self._event = threading.Event()
-    
+
     def setResult(self, value):
         self.result = value
         self._event.set()
@@ -119,6 +120,9 @@
 class AgentThreadPool(object):
 
     _size = 0
+    initial_backoff = 5
+    incremental_backoff = 5
+    maximum_backoff = 60
 
     def __init__(self, dispatcher, name, size):
         self.dispatcher = dispatcher
@@ -134,34 +138,54 @@
         local.dispatcher = self.dispatcher
         conn = self.dispatcher.db.open()
         try:
-            job = self.queue.get()
-            while job is not None:
-                identifier, dbname, info = job
+            job_info = self.queue.get()
+            while job_info is not None:
+                identifier, dbname, info = job_info
                 info['thread'] = thread.get_ident()
                 info['started'] = datetime.datetime.utcnow()
                 zc.async.utils.tracelog.info(
                     'starting in thread %d: %s',
                     info['thread'], info['call'])
+                backoff = self.initial_backoff
+                conflict_retry_count = 0
                 try:
-                    transaction.begin()
-                    if dbname is None:
-                        local_conn = conn
-                    else:
-                        local_conn = conn.get_connection(dbname)
-                    job = local_conn.get(identifier)
-                    local.job = job
+                    while 1:
+                        try:
+                            transaction.begin()
+                            if dbname is None:
+                                local_conn = conn
+                            else:
+                                local_conn = conn.get_connection(dbname)
+                            job = local_conn.get(identifier)
+                            # this setstate should trigger any initial problems
+                            # within the try/except retry structure here.
+                            local_conn.setstate(job)
+                            local.job = job
+                        except ZEO.Exceptions.ClientDisconnected:
+                            zc.async.utils.log.info(
+                                'ZEO client disconnected while trying to '
+                                'get job %d in db %s; retrying in %d seconds',
+                                ZODB.utils.u64(identifier), dbname or '',
+                                backoff)
+                            time.sleep(backoff)
+                            backoff = min(self.maximum_backoff,
+                                          backoff + self.incremental_backoff)
+                        except ZODB.POSException.TransactionError:
+                            # continue, i.e., try again
+                            conflict_retry_count += 1
+                            if (conflict_retry_count == 1 or
+                                not conflict_retry_count % 5):
+                                zc.async.utils.log.warning(
+                                    '%d transaction error(s) while trying to '
+                                    'get job %d in db %s',
+                                    conflict_retry_count,
+                                    ZODB.utils.u64(identifier), dbname or '',
+                                    exc_info=True)
+                            # now ``while 1`` loop will continue, to retry
+                        else:
+                            break
                     try:
                         job() # this does the committing and retrying, largely
-                    except ZODB.POSException.TransactionError:
-                        transaction.abort()
-                        while 1:
-                            job.fail()
-                            try:
-                                transaction.commit()
-                            except ZODB.POSException.TransactionError:
-                                transaction.abort() # retry forever (!)
-                            else:
-                                break
                     except zc.async.interfaces.BadStatusError:
                         transaction.abort()
                         zc.async.utils.log.error( # notice, not tracelog
@@ -169,6 +193,7 @@
                         if job.status == zc.async.interfaces.CALLBACKS:
                             job.resumeCallbacks() # moves the job off the agent
                         else:
+                            count = 0
                             while 1:
                                 status = job.status
                                 if status == zc.async.interfaces.COMPLETED:
@@ -180,17 +205,38 @@
                                     job.fail() # moves the job off the agent
                                 try:
                                     transaction.commit()
-                                except ZODB.POSException.TransactionError:
+                                except (ZODB.POSException.TransactionError,
+                                        ZODB.POSException.POSError):
+                                    if count and not count % 10:
+                                        zc.async.utils.log.critical(
+                                            'frequent database errors!  '
+                                            'I retry forever...',
+                                            exc_info=True)
+                                    time.sleep(1)
                                     transaction.abort() # retry forever (!)
                                 else:
                                     break
+                    except zc.async.utils.EXPLOSIVE_ERRORS:
+                        transaction.abort()
+                        raise
+                    except:
+                        # all errors should have been handled by the job at
+                        # this point, so anything other than BadStatusError,
+                        # SystemExit and KeyboardInterrupt are bad surprises.
+                        transaction.abort()
+                        zc.async.utils.log.critical(
+                            'unexpected error', exc_info=True)
+                        raise
                     # should come before 'completed' for threading dance
                     if isinstance(job.result, twisted.python.failure.Failure):
                         info['failed'] = True
                         info['result'] = job.result.getTraceback(
-                            elideFrameworkCode=True, detail='verbose')
+                            elideFrameworkCode=True)
                     else:
                         info['result'] = repr(job.result)
+                    if len(info['result']) > 10000:
+                        info['result'] = (
+                            info['result'][:10000] + '\n[...TRUNCATED...]')
                     info['completed'] = datetime.datetime.utcnow()
                 finally:
                     local.job = None
@@ -198,14 +244,14 @@
                 zc.async.utils.tracelog.info(
                     'completed in thread %d: %s',
                     info['thread'], info['call'])
-                job = self.queue.get()
+                job_info = self.queue.get()
         finally:
             conn.close()
             if self.dispatcher.activated:
                 # this may cause some bouncing, but we don't ever want to end
                 # up with fewer than needed.
                 self.dispatcher.reactor.callFromThread(self.setSize)
-    
+
     def setSize(self, size=None):
         # this should only be called from the thread in which the reactor runs
         # (otherwise it needs locks)
@@ -233,7 +279,7 @@
                 self.queue.put(None)
         return size - old # size difference
 
-# this is mostly for testing
+# this is mostly for testing, though ``get`` comes in handy generally
 
 _dispatchers = {}
 
@@ -249,6 +295,8 @@
 
 clear = _dispatchers.clear
 
+# end of testing bits
+
 class Dispatcher(object):
 
     activated = False
@@ -292,57 +340,22 @@
         self.dead_pools = []
 
     def _getJob(self, agent):
-        try:
-            job = agent.claimJob()
-        except zc.twist.EXPLOSIVE_ERRORS:
-            transaction.abort()
-            raise
-        except:
-            transaction.abort()
-            zc.async.utils.log.error(
-                'Error trying to get job for UUID %s from '
-                'agent %s (oid %s) in queue %s (oid %s)', 
-                self.UUID, agent.name, agent._p_oid,
-                agent.queue.name,
-                agent.queue._p_oid, exc_info=True)
-            return zc.twist.Failure()
-        res = self._commit(
-            'Error trying to commit getting a job for UUID %s from '
-            'agent %s (oid %s) in queue %s (oid %s)' % (
-            self.UUID, agent.name, agent._p_oid,
-            agent.queue.name,
-            agent.queue._p_oid))
-        if res is None:
-            # Successful commit
-            res = job
+        identifier = (
+            'getting job for UUID %s from agent %s (oid %d) '
+            'in queue %s (oid %d)' % (
+                self.UUID, agent.name, ZODB.utils.u64(agent._p_oid),
+                agent.queue.name, ZODB.utils.u64(agent.queue._p_oid)))
+        res = zc.async.utils.try_transaction_five_times(
+            agent.claimJob, identifier, transaction)
+        if isinstance(res, twisted.python.failure.Failure):
+            identifier = 'stashing failure on agent %s (oid %s)' % (
+                agent.name, ZODB.utils.u64(agent._p_oid))
+            def setFailure():
+                agent.failure = res
+            zc.async.utils.try_transaction_five_times(
+                setFailure, identifier, transaction)
         return res
 
-    def _commit(self, debug_string=''):
-        retry = 0
-        while 1:
-            try:
-                transaction.commit()
-            except ZODB.POSException.TransactionError:
-                transaction.abort()
-                if retry >= 5:
-                    zc.async.utils.log.error(
-                        'Repeated transaction error trying to commit in '
-                        'zc.async: %s', 
-                        debug_string, exc_info=True)
-                    return zc.twist.Failure()
-                retry += 1
-            except zc.twist.EXPLOSIVE_ERRORS:
-                transaction.abort()
-                raise
-            except:
-                transaction.abort()
-                zc.async.utils.log.error(
-                    'Error trying to commit: %s', 
-                    debug_string, exc_info=True)
-                return zc.twist.Failure()
-            else:
-                break
-
     def poll(self):
         poll_info = PollInfo()
         started_jobs = []
@@ -358,29 +371,30 @@
                     queue.dispatchers.register(self.UUID)
                 da = queue.dispatchers[self.UUID]
                 if queue._p_oid not in self._activated:
-                    if da.activated:
-                        if da.dead:
-                            da.deactivate()
-                        else:
-                            zc.async.utils.log.error(
-                                'UUID %s already activated in queue %s '
-                                '(oid %d): another process?  (To stop '
-                                'poll attempts in this process, set '
-                                '``zc.async.dispatcher.get().activated = '
-                                "False``.  To stop polls permanently, don't "
-                                'start a zc.async.dispatcher!)',
-                                self.UUID, queue.name,
-                                ZODB.utils.u64(queue._p_oid))
-                            continue
-                    da.activate()
-                    self._activated.add(queue._p_oid)
-                    # removed below if transaction fails
-                    res = self._commit(
-                        'Error trying to commit activation of UUID %s in '
-                        'queue %s (oid %s)' % (
-                            self.UUID, queue.name, queue._p_oid))
-                    if res is not None:
-                        self._activated.remove(queue._p_oid)
+                    identifier = (
+                        'activating dispatcher UUID %s in queue %s (oid %d)' %
+                        (self.UUID, queue.name, ZODB.utils.u64(queue._p_oid)))
+                    def activate():
+                        if da.activated:
+                            if da.dead:
+                                da.deactivate()
+                            else:
+                                zc.async.utils.log.error(
+                                    'UUID %s already activated in queue %s '
+                                    '(oid %d): another process?  (To stop '
+                                    'poll attempts in this process, set '
+                                    '``zc.async.dispatcher.get().activated = '
+                                    "False``.  To stop polls permanently, don't "
+                                    'start a zc.async.dispatcher!)',
+                                    self.UUID, queue.name,
+                                    ZODB.utils.u64(queue._p_oid))
+                                return False
+                        da.activate()
+                        return True
+                    if zc.async.utils.try_transaction_five_times(
+                        activate, identifier, transaction) is True:
+                        self._activated.add(queue._p_oid)
+                    else:
                         continue
                 queue_info = poll_info[queue.name] = {}
                 pools = self.queues.get(queue.name)
@@ -398,7 +412,7 @@
                     try:
                         agent_info['size'] = agent.size
                         agent_info['len'] = len(agent)
-                    except zc.twist.EXPLOSIVE_ERRORS:
+                    except zc.async.utils.EXPLOSIVE_ERRORS:
                         raise
                     except:
                         agent_info['error'] = zc.twist.Failure()
@@ -419,17 +433,6 @@
                         if isinstance(job, twisted.python.failure.Failure):
                             agent_info['error'] = job
                             job = None
-                            try:
-                                agent.failure = res
-                            except zc.twist.EXPLOSIVE_ERRORS:
-                                raise
-                            except:
-                                transaction.abort()
-                                zc.async.utils.log.error(
-                                    'error trying to stash failure on agent')
-                            else:
-                                # TODO improve msg
-                                self._commit('trying to stash failure on agent')
                         else:
                             info = {'result': None,
                                     'failed': False,
@@ -448,8 +451,10 @@
                             pool.queue.put(
                                 (job._p_oid, dbname, info))
                             job = self._getJob(agent)
-                queue.dispatchers.ping(self.UUID)
-                self._commit('trying to commit ping')
+                identifier = 'committing ping for UUID %s' % (self.UUID,)
+                zc.async.utils.try_transaction_five_times(
+                    lambda: queue.dispatchers.ping(self.UUID), identifier,
+                    transaction)
                 if len(pools) > len(queue_info):
                     conn_delta = 0
                     for name, pool in pools.items():
@@ -539,13 +544,16 @@
         try:
             transaction.begin()
             try:
-                queues = self.conn.root().get(zc.async.interfaces.KEY)
-                if queues is not None:
-                    for queue in queues.values():
-                        da = queue.dispatchers.get(self.UUID)
-                        if da is not None and da.activated:
-                            da.deactivate()
-                    self._commit('trying to tear down')
+                identifier = 'cleanly deactivating UUID %s' % (self.UUID,)
+                def deactivate_das():
+                    queues = self.conn.root().get(zc.async.interfaces.KEY)
+                    if queues is not None:
+                        for queue in queues.values():
+                            da = queue.dispatchers.get(self.UUID)
+                            if da is not None and da.activated:
+                                da.deactivate()
+                zc.async.utils.try_transaction_five_times(
+                    deactivate_das, identifier, transaction)
             finally:
                 transaction.abort()
                 self.conn.close()

Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/dispatcher.txt	2008-06-20 01:18:18 UTC (rev 87584)
@@ -8,29 +8,29 @@
 must have several methods:
 
 class IReactor(zope.interface.Interface):
-    
+
     def callFromThread(callable, *args, **kw):
         """have callable run in reactor's thread, by reactor, ASAP.
-        
+
         Intended to be called from a thread other than the reactor's main
         loop.
         """
-    
+
     def callInThread(callable, *args, **kw):
         """have callable run in a separate thread, ASAP.
-        
+
         Must be called in same thread as reactor's main loop.
         """
-    
+
     def callLater(seconds, callable, *args, **kw):
         """have callable run in reactor at least <seconds> from now
-        
+
         Must be called in same thread as reactor's main loop.
         """
 
     def addSystemEventTrigger(phase, event, callable, *args, **kw):
         """Install a callable to be run in phase of event.
-        
+
         must support phase 'before', and event 'shutdown'.
         """
 
@@ -140,7 +140,7 @@
 - The queue fired events to announce the dispatcher's registration and
   activation.  We could have registered subscribers for either or both
   of these events to create agents.
-  
+
   Note that the dispatcher in queue.dispatchers is a persistent
   representative of the actual dispatcher: they are different objects.
 
@@ -161,17 +161,17 @@
     True
 
 - The dispatcher made its first ping. A ping means that the dispatcher changes
-  a datetime to record that it is alive. 
+  a datetime to record that it is alive.
 
-    >>> queue.dispatchers[dispatcher.UUID].last_ping is not None
+    >>> queue.dispatchers[dispatcher.UUID].last_ping.value is not None
     True
 
-  The dispatcher needs to update its last_ping after every ``ping_interval``
-  seconds.  If it has not updated the last_ping after ``ping_death_interval``
-  then the dispatcher is considered to be dead, and active jobs in the
-  dispatcher's agents are ended (and given a chance to respond to that status
-  change, so they can put themselves back on the queue to be restarted if
-  desired).
+  The dispatcher needs to update its ``last_ping.value`` after every
+  ``ping_interval`` seconds. If it has not updated the ``last_ping.value``
+  after ``ping_death_interval`` then the dispatcher is considered to be dead,
+  and active jobs in the dispatcher's agents are ended (and given a chance to
+  respond to that status change, so they can put themselves back on the queue
+  to be restarted if desired).
 
     >>> queue.dispatchers[dispatcher.UUID].ping_interval
     datetime.timedelta(0, 30)
@@ -180,7 +180,7 @@
 
 - We have some log entries.  (We're using some magic log handlers inserted by
   setup code in tests.py here.)
-  
+
     >>> print event_logs # doctest: +ELLIPSIS
     zc.async.events INFO
       attempting to activate dispatcher ...
@@ -282,8 +282,8 @@
         {'main':
           {'active jobs': [], 'error': None,
            'new jobs': [(..., 'unnamed')], 'len': 0, 'size': 3}}}
-    
 
+
 [#getPollInfo]_ Notice our ``new jobs`` from the poll and the log has a value
 in it now. We can get some information about that job from the dispatcher.
 
@@ -350,8 +350,18 @@
 
     >>> badjob.parent # doctest: +ELLIPSIS
     <zc.async.agent.Agent object at 0x...>
-    >>> len(badjob.parent)
-    0
+    >>> import time
+    >>> for i in range(60):
+    ...     if len(badjob.parent) == 0:
+    ...         print 'yay'
+    ...         break
+    ...     else:
+    ...         time.sleep(0.1)
+    ...         _ = transaction.begin()
+    ... else:
+    ...     print 'oops'
+    ...
+    yay
 
 ``zc.async.local`` also allows some fun tricks.  Your callable can access the
 queue, perhaps to put another job in.
@@ -371,7 +381,7 @@
 of the last database sync for the thread's connection (at transaction
 boundaries).  This function is ``zc.async.local.getJob()``, and is seen below.
 
-It can also get and set job annotations *live, in another connection*. 
+It can also get and set job annotations *live, in another connection*.
 This allows you to send messages about job progress, or get live
 information about whether you should change or stop your work, for
 instance.
@@ -452,8 +462,8 @@
     >>> import ZODB.FileStorage
     >>> storage = ZODB.FileStorage.FileStorage(
     ...     'main.fs', create=True)
-    >>> from ZODB.DB import DB 
-    >>> db = DB(storage) 
+    >>> from ZODB.DB import DB
+    >>> db = DB(storage)
     >>> conn = db.open()
     >>> root = conn.root()
     >>> import zc.async.configure
@@ -461,7 +471,7 @@
 
 .. [#getPollInfo] The dispatcher has a ``getPollInfo`` method that lets you
     find this poll information also.
-    
+
     >>> dispatcher.getPollInfo(at=poll.key) is poll
     True
     >>> dispatcher.getPollInfo(at=poll.utc_timestamp) is poll
@@ -480,7 +490,7 @@
 .. [#show_error] OK, so you want to see a verbose traceback?  OK, you asked
     for it. We're eliding more than 90% of this, and this is a small one,
     believe it or not. Rotate your logs!
-    
+
     Notice that all of the values in the logs are reprs.
 
     >>> bad_job = queue.put(
@@ -489,8 +499,8 @@
 
     >>> wait_for_result(bad_job)
     <zc.twist.Failure exceptions.TypeError>
-    
-    >>> for r in reversed(trace_logs.records):
+
+    >>> for r in reversed(event_logs.records):
     ...     if r.levelname == 'ERROR':
     ...         break
     ... else:
@@ -500,12 +510,9 @@
     <zc.async.job.Job (oid ..., db 'unnamed')
      ``<built-in function mul>(14, None)``> failed with traceback:
     *--- Failure #... (pickled) ---
-    .../zc/async/job.py:...: _call_with_retry(...)
+    .../zc/async/job.py:...: __call__(...)
      [ Locals ]...
      ( Globals )...
-    .../zc/async/job.py:...: <lambda>(...)
-     [ Locals ]...
-     ( Globals )...
     exceptions.TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'
     *--- End of Failure #... ---
     <BLANKLINE>

Modified: zc.async/trunk/src/zc/async/interfaces.py
===================================================================
--- zc.async/trunk/src/zc/async/interfaces.py	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/interfaces.py	2008-06-20 01:18:18 UTC (rev 87584)
@@ -28,12 +28,12 @@
 except ImportError:
     class IDatabaseOpenedEvent(zope.interface.Interface):
         """The main database has been opened."""
-    
+
         database = zope.interface.Attribute("The main database.")
-    
+
     class DatabaseOpened(object):
         zope.interface.implements(IDatabaseOpenedEvent)
-    
+
         def __init__(self, database):
             self.database = database
 
@@ -51,31 +51,31 @@
 
 class IReactor(zope.interface.Interface):
     """This describes what the dispatcher expects of the reactor.
-    
+
     The reactor does not need to actually provide this interface."""
-    
+
     def callFromThread(callable, *args, **kw):
         """have callable run in reactor's thread, by reactor, ASAP.
-        
+
         Intended to be called from a thread other than the reactor's main
         loop.
         """
-    
+
     def callInThread(callable, *args, **kw):
         """have callable run in a separate thread, ASAP.
-        
+
         Must be called in same thread as reactor's main loop.
         """
-    
+
     def callLater(seconds, callable, *args, **kw):
         """have callable run in reactor at least <seconds> from now
-        
+
         Must be called in same thread as reactor's main loop.
         """
 
     def addSystemEventTrigger(phase, event, callable, *args, **kw):
         """Install a callable to be run in phase of event.
-        
+
         must support phase 'before', and event 'shutdown'.
         """
 
@@ -83,10 +83,35 @@
         """run callable now if running, or when started.
         """
 
+class IRetryPolicy(zope.interface.Interface):
+    def jobError(failure, data_cache):
+        """whether and how to retry after an error while performing job.
 
+        return boolean as to whether to retry, or a datetime or timedelta to
+        reschedule the job in the queue.  An empty timedelta means to rescedule
+        for immediately, before any pending calls in the queue."""
+
+    def commitError(failure, data_cache):
+        """whether to retry after trying to commit a job's successful result.
+
+        return boolean as to whether to retry, or a datetime or timedelta to
+        reschedule the job in the queue.  An empty timedelta means to rescedule
+        for immediately, before any pending calls in the queue."""
+
+    def interrupted():
+        """whether to retry after a dispatcher dies when job was in progress.
+
+        return boolean as to whether to retry, or a datetime or timedelta to
+        reschedule the job in the queue.  An empty timedelta means to rescedule
+        for immediately, before any pending calls in the queue."""
+
+    def updateData(data_cache):
+        """right before committing a job, retry is given a chance to stash
+        information it has saved in the data_cache."""
+
 class IObjectEvent(zope.interface.Interface):
     """Event happened to object"""
-    
+
     object = zope.interface.Attribute('the object')
 
 class AbstractObjectEvent(object):
@@ -136,9 +161,13 @@
 
 class AbortedError(Exception):
     """An explicit abort, as generated by the default behavior of
-    IJob.fail"""
+    IJob.handleInterrupt"""
 
 
+class TimeoutError(Exception):
+    """A time out caused by a ``begin_by`` value."""
+
+
 class BadStatusError(Exception):
     """The job is not in the status it should be for the call being made.
     This is almost certainly a programmer error."""
@@ -168,7 +197,7 @@
         """One of constants defined in zc.async.interfaces:
         NEW, PENDING, ASSIGNED, ACTIVE, CALLBACKS, COMPLETED.
 
-        NEW means not added to a queue and not yet called.        
+        NEW means not added to a queue and not yet called.
         PENDING means addded to a queue but not an agent, and not yet called.
         ASSIGNED means added to an agent and not yet called.
         ACTIVE means in the process of being called.
@@ -207,10 +236,8 @@
         self.args for the call, and any kwargs effectively update self.kwargs
         for the call."""
 
-    def fail(e=AbortedError):
-        """Fail this job, with option error e.  May only be called when
-        job is in PENDING or ACTIVE states, or else raises BadStatusError.
-        If e is not provided,"""
+    def handleInterrupt():
+        """use IRetryPolicy to decide whether to abort."""
 
     def resumeCallbacks():
         """Make all callbacks remaining for this job.  Any callbacks
@@ -229,7 +256,7 @@
 #         """a set of selected worker UUIDs.  If it is empty, it is
 #         interpreted as the set of all available workerUUIDs.  Only
 #         workers with UUIDs in the set may perform it.
-# 
+#
 #         If a worker would have selected this job for a run, but the
 #         difference of selected_workerUUIDs and excluded_workerUUIDs
 #         stopped it, it is responsible for verifying that the effective
@@ -250,26 +277,26 @@
 
 class IAgent(zope.interface.common.sequence.IFiniteSequence):
     """Responsible for picking jobs and keeping track of them.
-    
+
     An agent is a persistent object in a queue that is associated with a
     dispatcher and is responsible for picking jobs and keeping track of
     them. Zero or more agents within a queue can be associated with a
-    dispatcher. 
-    
+    dispatcher.
+
     Each agent for a given dispatcher is identified uniquely with a
     name.  A fully (universally) unique identifier for the agent can be
     obtained by combining the key of the agent's queue in the main queue
     mapping at the ZODB root; the UUID of the agent's dispatcher; and
     the agent's name.
     """
-    
+
     size = zope.interface.Attribute(
         """The maximum number of jobs this agent should have active at a time.
         """)
 
     name = zope.interface.Attribute(
         """The name for this agent.  Unique within its dispatcher's jobs for
-        its queue.  Can be used to obtain agent with 
+        its queue.  Can be used to obtain agent with
         queue.dispatchers[*dispatcher UUID*][*name*].""")
 
     completed = zope.interface.Attribute(
@@ -305,9 +332,9 @@
         Rememeber that IJobs are not guaranteed to be run in order
         added to a queue.  If you need sequencing, use
         IJob.addCallbacks.
-        
+
         item must be an IJob, or be adaptable to that interface.
-        begin_after must be None (to leave the job's current value) or a 
+        begin_after must be None (to leave the job's current value) or a
         datetime.datetime.  begin_by must be None (to leave it alone) or a
         datetime.timedelta of the duration after the begin_after.
 
@@ -326,11 +353,14 @@
         queue on the `assignerUUID`.
         """
 
+    def putBack(item):
+        """Return a previously claimed job to the top of the queue."""
+
     def pull(index=0):
         """Remove and return a job, by default from the front of the queue.
 
         Raise IndexError if index does not exist.
-        
+
         This is the blessed way to remove an unclaimed job from the queue so
         that dispatchers will not try to perform it.
         """
@@ -356,7 +386,7 @@
     def ping(UUID):
         """responsible for setting ping time if necessary for this
         dispatcher agent, and for decomissioning dead dispatchers for
-        the next highest dispatcher (sorted by UUID) if its (last_ping +
+        the next highest dispatcher (sorted by UUID) if its (last_ping.value +
         ping_interval + ping_death_interval) < now.  If this is the
         highest dispatcher UUID, cycles around to lowest."""
 

Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/job.py	2008-06-20 01:18:18 UTC (rev 87584)
@@ -11,11 +11,14 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
+import time
 import types
 import datetime
+import logging
 
 import BTrees.OOBTree
 import ZODB.POSException
+import ZEO.Exceptions
 import transaction.interfaces
 import persistent
 import persistent.list
@@ -74,6 +77,131 @@
                     elif status == zc.async.interfaces.CALLBACKS:
                         a.resumeCallbacks()
 
+class RetryCommonFourTimes(persistent.Persistent): # default
+    zope.component.adapts(zc.async.interfaces.IJob)
+    zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+
+    # exceptions, data_cache key, max retry, initial backoff seconds,
+    # incremental backoff seconds, max backoff seconds
+    internal_exceptions = (
+        ((ZEO.Exceptions.ClientDisconnected,), 'zeo_disconnected',
+         None, 5, 5, 60),
+        ((ZODB.POSException.TransactionError,), 'transaction_error',
+         5, 0, 0, 0),
+    )
+    transaction_exceptions = internal_exceptions
+    max_interruptions = 9
+    log_every = 5
+
+    def __init__(self, job):
+        self.parent = self.__parent__ = job
+        self.data = BTrees.family32.OO.BTree()
+
+    def updateData(self, data_cache):
+        if 'first_active' in self.data and 'first_active' in data_cache:
+            data_cache.pop('first_active')
+        self.data.update(data_cache)
+
+    def jobError(self, failure, data_cache):
+        return self._process(failure, data_cache, self.internal_exceptions)
+
+    def commitError(self, failure, data_cache):
+        return self._process(failure, data_cache, self.transaction_exceptions)
+
+    def _process(self, failure, data_cache, exceptions):
+        for (exc, key, max_count, init_backoff,
+             incr_backoff, max_backoff) in exceptions:
+            if failure.check(*exc) is not None:
+                count = data_cache.get(key, 0) + 1
+                if max_count is not None and count >= max_count:
+                    zc.async.utils.tracelog.info(
+                        'Retry policy for job %r is not retrying after %d '
+                        'counts of %s occurrences', self.parent, count, key)
+                    return False
+                elif count==1 or not count % self.log_every:
+                    zc.async.utils.tracelog.info(
+                        'Retry policy for job %r requests another attempt '
+                        'after %d counts of %s occurrences', self.parent,
+                        count, key)
+                backoff = min(max_backoff,
+                              (init_backoff + (count-1) * incr_backoff))
+                if backoff:
+                    time.sleep(backoff)
+                data_cache[key] = count
+                data_cache['last_' + key] = failure
+                if 'first_active' not in data_cache:
+                    data_cache['first_active'] = self.parent.active_start
+                return True
+        return False
+
+    def interrupted(self):
+        if 'first_active' not in self.data:
+            self.data['first_active'] = self.parent.active_start
+        count = self.data['interruptions'] = self.data.get('interruptions', 0) + 1
+        if self.max_interruptions is None or count <= self.max_interruptions:
+            if count==1 or not count % self.log_every:
+                zc.async.utils.tracelog.info(
+                    'Retry policy for job %r requests another attempt '
+                    'after %d interrupts', self.parent, count)
+            return True
+        else:
+            zc.async.utils.tracelog.info(
+                'Retry policy for job %r is not retrying after %d '
+                'interrupts', self.parent, count)
+            return False
+
+
+class RetryCommonForever(RetryCommonFourTimes):
+    # retry on ZEO failures and Transaction errors during the job forever
+    # retry on commitErrors and interrupteds forever.
+    internal_exceptions = (
+        ((ZEO.Exceptions.ClientDisconnected,), 'zeo_disconnected',
+         None, 5, 5, 60),
+        ((ZODB.POSException.TransactionError,), 'transaction_error',
+         None, 0, 0, 0),
+    )
+
+    max_interruptions = None
+
+    def commitError(self, failure, data_cache):
+        res = super(RetryCommonForever, self).commitError(failure, data_cache)
+        if not res:
+            # that just means we didn't record it.  We actually are going to
+            # retry.
+            key = 'other'
+            data_cache['other'] = data_cache.get('other', 0) + 1
+            data_cache['last_other'] = failure
+            if 'first_active' not in data_cache:
+                data_cache['first_active'] = self.parent.active_start
+        return True # always retry
+
+class NeverRetry(persistent.Persistent):
+    zope.component.adapts(zc.async.interfaces.IJob)
+    zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+
+    def __init__(self, job):
+        self.parent = self.__parent__ = job
+
+    def updateData(self, data_cache):
+        pass
+
+    def jobError(self, failure, data_cache):
+        return False
+
+    def commitError(self, failure, data_cache):
+        return False
+
+    def interrupted(self):
+        return False
+
+def callback_retry_policy_factory(job):
+    res = zope.component.queryAdapter(
+        job, zc.async.interfaces.IRetryPolicy, 'callback')
+    if res is None:
+        res = RetryCommonForever(job)
+    return res
+
+
 class Job(zc.async.utils.Base):
 
     zope.interface.implements(zc.async.interfaces.IJob)
@@ -82,7 +210,11 @@
     _status = zc.async.interfaces.NEW
     _begin_after = _begin_by = _active_start = _active_end = None
     key = None
-    
+    _retry_policy = None
+    retry_policy_factory = None # effectively "look up IRetryPolicy adapter
+    # for '' (empty string) name, and use RetryCommonFourTimes if the adapter
+    # doesn't exist"
+    failure_log_level = None # effectively logging.ERROR
     assignerUUID = None
     _quota_names = ()
 
@@ -227,7 +359,8 @@
     @rwproperty.setproperty
     def callable(self, value):
         # can't pickle/persist methods by default as of this writing, so we
-        # add the sugar ourselves
+        # add the sugar ourselves.  In future, would like for args to be
+        # potentially methods of persistent objects too...
         if self._status != zc.async.interfaces.NEW:
             raise zc.async.interfaces.BadStatusError(
                 'can only set callable when a job has NEW, PENDING, or '
@@ -243,12 +376,31 @@
         if zc.async.interfaces.IJob.providedBy(self._callable_root):
             self._callable_root.parent = self
 
-    def addCallbacks(self, success=None, failure=None):
+    def addCallbacks(self, success=None, failure=None,
+                     failure_log_level=None, retry_policy_factory=None):
         if success is not None or failure is not None:
             if success is not None:
                 success = zc.async.interfaces.IJob(success)
+                if failure_log_level is not None:
+                    success.failure_log_level = failure_log_level
+                elif success.failure_log_level is None:
+                    success.failure_log_level = logging.CRITICAL
+                if retry_policy_factory is not None:
+                    success.retry_policy_factory = retry_policy_factory
+                elif success.retry_policy_factory is None:
+                    success.retry_policy_factory = (
+                        callback_retry_policy_factory)
             if failure is not None:
                 failure = zc.async.interfaces.IJob(failure)
+                if failure_log_level is not None:
+                    failure.failure_log_level = failure_log_level
+                elif failure.failure_log_level is None:
+                    failure.failure_log_level = logging.CRITICAL
+                if retry_policy_factory is not None:
+                    failure.retry_policy_factory = retry_policy_factory
+                elif failure.retry_policy_factory is None:
+                    failure.retry_policy_factory = (
+                        callback_retry_policy_factory)
             res = Job(success_or_failure, success, failure)
             if success is not None:
                 success.parent = res
@@ -260,12 +412,14 @@
             abort_handler = zc.async.interfaces.IJob(
                 completeStartedJobArguments)
             abort_handler.args.append(res)
-            res.addCallback(abort_handler)
+            res.addCallback(
+                abort_handler, failure_log_level, retry_policy_factory)
         else:
             res = self
         return res
 
-    def addCallback(self, callback):
+    def addCallback(self, callback, failure_log_level=None,
+                    retry_policy_factory=None):
         callback = zc.async.interfaces.IJob(callback)
         self.callbacks.put(callback)
         callback.parent = self
@@ -274,96 +428,289 @@
         else:
             self._p_changed = True # to try and fire conflict errors if
             # our reading of self.status has changed beneath us
+        if failure_log_level is not None:
+            callback.failure_log_level = failure_log_level
+        elif callback.failure_log_level is None:
+            callback.failure_log_level = logging.CRITICAL
+        if retry_policy_factory is not None:
+            callback.retry_policy_factory = retry_policy_factory
+        elif callback.retry_policy_factory is None:
+            callback.retry_policy_factory = callback_retry_policy_factory
         return callback
 
+    def getRetryPolicy(self):
+        if self._retry_policy is not None:
+            return self._retry_policy
+        if self.retry_policy_factory is None:
+            # first try to look up adapter with name of ''; then if that fails
+            # use RetryCommonFourTimes
+            res = zope.component.queryAdapter(
+                self, zc.async.interfaces.IRetryPolicy, '')
+            if res is None:
+                res = RetryCommonFourTimes(self)
+        elif isinstance(self.retry_policy_factory, basestring):
+            res = zope.component.getAdapter(
+                self, zc.async.interfaces.IRetryPolicy,
+                self.retry_policy_factory)
+            # this may cause an error. We can't proceed because we don't know
+            # what to do, and it may be *critical* to know. Therefore, in
+            # _getRetry, we rely on never_fail to keep on sending critical
+            # errors in the log, and never stopping.
+        else:
+            res = self.retry_policy_factory(self)
+        self._retry_policy = res
+        return res
+
+    def _getRetry(self, call_name, tm, *args):
+        # if we are after the time that we are supposed to begin_by, no retry
+        if (self.begin_by is not None and self.begin_after is not None and
+            self.begin_by + self.begin_after > datetime.datetime.now(pytz.UTC)):
+            return False
+        # we divide up the two ``never_fail`` calls so that retries in getting
+        # the policy don't affect actually calling the method.
+        identifier = 'getting retry policy for %r' % (self,)
+        policy = zc.async.utils.never_fail(self.getRetryPolicy, identifier, tm)
+        call = getattr(policy, call_name, None)
+        if call is None:
+            zc.async.utils.log.error(
+                'retry policy %r for %r does not have required %s method',
+                policy, self, call_name)
+            return None
+        identifier = 'getting result for %s retry for %r' % (call_name, self)
+        return zc.async.utils.never_fail(lambda: call(*args), identifier, tm)
+
     def __call__(self, *args, **kwargs):
         if self.status not in (zc.async.interfaces.NEW,
                                zc.async.interfaces.ASSIGNED):
             raise zc.async.interfaces.BadStatusError(
                 'can only call a job with NEW or ASSIGNED status')
         tm = transaction.interfaces.ITransactionManager(self)
-        self._status = zc.async.interfaces.ACTIVE
-        self._active_start = datetime.datetime.now(pytz.UTC)
-        tm.commit()
-        effective_args = list(args)
-        effective_args[0:0] = self.args
-        effective_kwargs = dict(self.kwargs)
-        effective_kwargs.update(kwargs)
-        return self._call_with_retry(
-            lambda: self.callable(*effective_args, **effective_kwargs))
-
-    def _call_with_retry(self, call):
-        ct = 0
-        tm = transaction.interfaces.ITransactionManager(self)
+        def prepare():
+            self._status = zc.async.interfaces.ACTIVE
+            self._active_start = datetime.datetime.now(pytz.UTC)
+            effective_args = list(args)
+            effective_args[0:0] = self.args
+            effective_kwargs = dict(self.kwargs)
+            effective_kwargs.update(kwargs)
+            return effective_args, effective_kwargs
+        identifier = 'preparing for call of %r' % (self,)
+        effective_args, effective_kwargs = zc.async.utils.never_fail(
+            prepare, identifier, tm)
+        # this is the calling code.  It is complex and long because it is
+        # trying both to handle exceptions reasonably, and to honor the
+        # IRetryPolicy interface for those exceptions.
+        data_cache = {}
         res = None
         while 1:
             try:
-                res = call()
-                if zc.async.interfaces.IJob.providedBy(res):
-                    res.addCallback(self._callback)
-                    tm.commit()
-                elif isinstance(res, twisted.internet.defer.Deferred):
-                    res.addBoth(zc.twist.Partial(self._callback))
-                    tm.commit()
-                else:
-                    res = self._complete(res, tm)
-            except ZODB.POSException.TransactionError:
+                res = self.callable(*effective_args, **effective_kwargs)
+            except zc.async.utils.EXPLOSIVE_ERRORS:
                 tm.abort()
-                ct += 1
-                if ct >= 5:
-                    res = self._complete(zc.twist.Failure(), tm)
-                    self.resumeCallbacks()
-                else:
+                raise
+            except:
+                res = zc.twist.Failure()
+                tm.abort()
+                retry = self._getRetry('jobError', tm, res, data_cache)
+                if isinstance(retry, (datetime.timedelta, datetime.datetime)):
+                    identifier = (
+                        'rescheduling %r as requested by '
+                        'associated IRetryPolicy %r' % (
+                            self, self.getRetryPolicy()))
+                    if self is zc.async.utils.never_fail(
+                        lambda: self._reschedule(retry, data_cache),
+                        identifier, tm):
+                        return self
+                elif retry:
                     continue
-            except zc.twist.EXPLOSIVE_ERRORS:
+                # policy didn't exist or returned False or couldn't reschedule
+            try:
+                callback = self._set_result(res, tm, data_cache)
+            except zc.async.utils.EXPLOSIVE_ERRORS:
                 tm.abort()
                 raise
             except:
+                failure = zc.twist.Failure()
                 tm.abort()
-                res = self._complete(zc.twist.Failure(), tm)
-                self.resumeCallbacks()
+                retry = self._getRetry('commitError', tm, failure, data_cache)
+                if isinstance(retry, (datetime.timedelta, datetime.datetime)):
+                    identifier = (
+                        'rescheduling %r as requested by '
+                        'associated IRetryPolicy %r' % (
+                            self, self.getRetryPolicy()))
+                    if self is zc.async.utils.never_fail(
+                        lambda: self._reschedule(retry, data_cache),
+                        identifier, tm):
+                        return self
+                elif retry:
+                    continue
+                # policy didn't exist or returned False or couldn't reschedule
+                if isinstance(res, twisted.python.failure.Failure):
+                    log_level = self.failure_log_level
+                    if log_level is None:
+                        log_level = logging.ERROR
+                    zc.async.utils.log.log(
+                        log_level,
+                        'Commit failed for %r (see subsequent traceback).  '
+                        'Prior to this, job failed with traceback:\n%s',
+                        self,
+                        res.getTraceback(
+                            elideFrameworkCode=True, detail='verbose'))
+                else:
+                    zc.async.utils.log.info(
+                        'Commit failed for %r (see subsequent traceback).  '
+                        'Prior to this, job succeeded with result: %r',
+                        self, res)
+                res = failure
+                def complete():
+                    self._result = res
+                    self._status = zc.async.interfaces.CALLBACKS
+                    self._active_end = datetime.datetime.now(pytz.UTC)
+                    policy = self.getRetryPolicy()
+                    if data_cache and self._retry_policy is not None:
+                        self._retry_policy.updateData(data_cache)
+                identifier = 'storing failure at commit of %r' % (self,)
+                zc.async.utils.never_fail(complete, identifier, tm)
+                callback = True
+            if callback:
+                self._log_completion(res)
+                identifier = 'performing callbacks of %r' % (self,)
+                zc.async.utils.never_fail(self.resumeCallbacks, identifier, tm)
+            return res
+
+    def handleInterrupt(self):
+        # should be called within a job that has a RetryCommonForever policy
+        tm = transaction.interfaces.ITransactionManager(self)
+        if self.status == zc.async.interfaces.ACTIVE:
+            retry = self._getRetry('interrupted', tm)
+            if isinstance(retry, (datetime.datetime, datetime.timedelta)):
+                self._reschedule(retry, queue=self.queue)
+            elif retry:
+                self._reschedule(datetime.timedelta(), queue=self.queue)
             else:
-                if self._status == zc.async.interfaces.CALLBACKS:
+                res = zc.twist.Failure(zc.async.interfaces.AbortedError())
+                if self._set_result(res, tm):
                     self.resumeCallbacks()
-            return res
+                self._log_completion(res)
+        elif self.status != zc.async.interfaces.CALLBACKS:
+            # we have to allow CALLBACKS or else some retries will fall over,
+            # because handleInterrupt may fail after a commit of the aborted
+            # error
+            raise zc.async.interfaces.BadStatusError(
+                'can only call ``handleInterrupt`` on a job with ACTIVE '
+                'status') # um...or CALLBACKS, but that's a secret :-D
+        else:
+            self.resumeCallbacks()
 
-    def _callback(self, res):
-        self._call_with_retry(lambda: res)
+    def fail(self, e=None):
+        # something may have fallen over the last time this was called, so we
+        # are careful to only store the error if we're not in the CALLBACKS
+        # status.
+        callback = True
+        status = self.status
+        if status in (zc.async.interfaces.COMPLETED,
+                      zc.async.interfaces.ACTIVE):
+            raise zc.async.interfaces.BadStatusError(
+                'can only call fail on a job with NEW, PENDING, or ASSIGNED '
+                'status') # ...or CALLBACKS, but that's because of
+                # retries, and is semantically incorrect
+        if status != zc.async.interfaces.CALLBACKS:
+            if e is None:
+                e = zc.async.interfaces.TimeoutError()
+            res = zc.twist.Failure(e)
+            callback = self._set_result(
+                res, transaction.interfaces.ITransactionManager(self))
+            self._log_completion(res)
+        if callback:
+            self.resumeCallbacks()
 
-    def _complete(self, res, tm):
-        if isinstance(res, twisted.python.failure.Failure):
-            res = zc.twist.sanitize(res)
-            failure = True
+    def _reschedule(self, when, data_cache=None, queue=None):
+        if not isinstance(when, (datetime.datetime, datetime.timedelta)):
+            raise TypeError('``when`` must be datetime or timedelta')
+        in_agent = zc.async.interfaces.IAgent.providedBy(self.parent)
+        if queue is None:
+            # this is a reschedule from jobError or commitError
+            if not in_agent:
+                zc.async.utils.log.critical(
+                    'error for IRetryPolicy %r on %r: '
+                    'can only reschedule a job directly in an agent',
+                    self.getRetryPolicy(), self)
+                return None
+            queue = self.queue
+        if data_cache is not None and self._retry_policy is not None:
+            self._retry_policy.updateData(data_cache)
+        self._status = zc.async.interfaces.NEW
+        self._active_start = None
+        if in_agent:
+            self.parent.remove(self)
         else:
-            failure = False
-        self._result = res
-        self._status = zc.async.interfaces.CALLBACKS
-        self._active_end = datetime.datetime.now(pytz.UTC)
+            self.parent = None
+        now = datetime.datetime.now(pytz.UTC)
+        if isinstance(when, datetime.datetime):
+            if when.tzinfo is None:
+                when = when.replace(tzinfo=pytz.UTC)
+            if when <= now:
+                queue.putBack(self)
+            else:
+                queue.put(self, begin_after=when)
+        elif isinstance(when, datetime.timedelta):
+            if when <= datetime.timedelta():
+                queue.putBack(self)
+            else:
+                queue.put(self, begin_after=now+when)
+        return self
+
+    def _set_result(self, res, tm, data_cache=None):
+        # returns whether to call ``resumeCallbacks``
+        callback = True
+        if zc.async.interfaces.IJob.providedBy(res):
+            res.addCallback(self._callback)
+            callback = False
+        elif isinstance(res, twisted.internet.defer.Deferred):
+            partial = zc.twist.Partial(self._callback)
+            partial.max_transaction_errors = None # retry conflicts forever
+            res.addBoth(partial)
+            callback = False
+        else:
+            if isinstance(res, twisted.python.failure.Failure):
+                res = zc.twist.sanitize(res)
+            self._result = res
+            self._status = zc.async.interfaces.CALLBACKS
+            self._active_end = datetime.datetime.now(pytz.UTC)
+        if self._retry_policy is not None and data_cache:
+            self._retry_policy.updateData(data_cache)
         tm.commit()
-        if failure:
-            zc.async.utils.tracelog.error(
+        return callback
+
+    def _log_completion(self, res):
+        if isinstance(res, twisted.python.failure.Failure):
+            log_level = self.failure_log_level
+            if log_level is None:
+                log_level = logging.ERROR
+            zc.async.utils.log.log(
+                log_level,
                 '%r failed with traceback:\n%s',
                 self,
-                res.getTraceback(elideFrameworkCode=True, detail='verbose'))
+                res.getTraceback(
+                    elideFrameworkCode=True, detail='verbose'))
         else:
             zc.async.utils.tracelog.info(
-                '%r succeeded with result:\n%r',
+                '%r succeeded with result: %r',
                 self, res)
-        return res
 
-    def fail(self, e=None):
-        if e is None:
-            e = zc.async.interfaces.AbortedError()
-        if self._status not in (zc.async.interfaces.NEW,
-                                zc.async.interfaces.ACTIVE):
-            raise zc.async.interfaces.BadStatusError(
-                'can only call fail on a job with NEW, PENDING, ASSIGNED, or '
-                'ACTIVE status')
-        self._complete(zc.twist.Failure(e),
-                       transaction.interfaces.ITransactionManager(self))
-        self.resumeCallbacks()
+    def _callback(self, res):
+        # done within a job or partial, so we can rely on their retry bits to
+        # some degree.  However, we commit transactions ourselves, so we have
+        # to be a bit careful that the result hasn't been set already.
+        callback = True
+        if self._status == zc.async.interfaces.ACTIVE:
+            callback = self._set_result(
+                res, transaction.interfaces.ITransactionManager(self))
+            self._log_completion(res)
+        if callback:
+            self.resumeCallbacks()
 
     def resumeCallbacks(self):
+        # should be called within a job that has a RetryCommonForever policy
         if self._status != zc.async.interfaces.CALLBACKS:
             raise zc.async.interfaces.BadStatusError(
                 'can only resumeCallbacks on a job with CALLBACKS status')
@@ -372,34 +719,53 @@
         length = 0
         while 1:
             for j in callbacks:
+                # TODO yuck: this mucks in callbacks' protected bits
                 if j._status == zc.async.interfaces.NEW:
-                    zc.async.utils.tracelog.debug(
-                        'starting callback %r to %r', j, self)
-                    j(self.result)
+                    if (j.begin_by is not None and
+                        (j.begin_after + j.begin_by) <
+                        datetime.datetime.now(pytz.UTC)):
+                        zc.async.utils.log.error(
+                            'failing expired callback %r to %r', j, self)
+                        j.fail()
+                    else:
+                        zc.async.utils.tracelog.debug(
+                            'starting callback %r to %r', j, self)
+                        j(self.result)
                 elif j._status == zc.async.interfaces.ACTIVE:
-                    zc.async.utils.tracelog.debug(
-                        'failing aborted callback %r to %r', j, self)
-                    j.fail()
+                    retry = j._getRetry('interrupted', tm)
+                    istime = isinstance(
+                        retry, (datetime.timedelta, datetime.datetime))
+                    if istime:
+                        zc.async.utils.log.error(
+                            'error for IRetryPolicy %r on %r: '
+                            'cannot reschedule a callback, only retry',
+                            j.getRetryPolicy(), j)
+                    if retry or istime:
+                        zc.async.utils.tracelog.debug(
+                            'retrying interrupted callback '
+                            '%r to %r', j, self)
+                        j._status = zc.async.interfaces.NEW
+                        j._active_start = None
+                        j(self.result)
+                    else:
+                        zc.async.utils.tracelog.debug(
+                            'aborting interrupted callback '
+                            '%r to %r', j, self)
+                        j.fail(zc.async.interfaces.AbortedError())
                 elif j._status == zc.async.interfaces.CALLBACKS:
                     j.resumeCallbacks()
                 # TODO: this shouldn't raise anything we want to catch, right?
                 # now, this should catch all the errors except EXPLOSIVE_ERRORS
                 # cleaning up dead jobs should look something like the above.
-            tm.commit()
             tm.begin() # syncs
             # it's possible that someone added some callbacks, so run until
             # we're exhausted.
             length += len(callbacks)
             callbacks = list(self.callbacks)[length:]
             if not callbacks:
-                try:
-                    self._status = zc.async.interfaces.COMPLETED
-                    if zc.async.interfaces.IAgent.providedBy(self.parent):
-                        self.parent.jobCompleted(self)
-                    tm.commit()
-                except ZODB.POSException.TransactionError:
-                    tm.abort()
-                    callbacks = list(self.callbacks)[length:]
-                else:
-                    break # and return
-
+                # this whole method is called within a never_fail...
+                self._status = zc.async.interfaces.COMPLETED
+                if zc.async.interfaces.IAgent.providedBy(self.parent):
+                    self.parent.jobCompleted(self)
+                tm.commit()
+                return

Modified: zc.async/trunk/src/zc/async/job.txt
===================================================================
--- zc.async/trunk/src/zc/async/job.txt	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/job.txt	2008-06-20 01:18:18 UTC (rev 87584)
@@ -2,34 +2,34 @@
 Jobs
 ====
 
-What if you want to persist a reference to the method of a persistent
-object--you can't persist that normally in the ZODB, but that can be
-very useful, especially to store asynchronous calls.  What if you want
-to act on the result of an asynchronous call that may be called later?
-The zc.async package offers an approach that combines ideas of a partial
-and that of Twisted deferred code: ``zc.async.job.Job``.  
+What if you want to persist a reference to the method of a persistent object?
+You can't persist that normally in the ZODB, but that can be very useful,
+especially to store asynchronous calls. What if you want to act on the result
+of an asynchronous call that may be called later?
 
-To use it, simply wrap the callable--a method of a persistent object or
-a callable persistent object or a global function--in the job.  You can
-include ordered and keyword arguments to the job, which may be
-persistent objects or simply pickleable objects.
+The zc.async package offers an approach that combines ideas of a partial and
+that of a Twisted deferred: ``zc.async.job.Job``. It has code that is specific
+to zc.async, so it is not truly a general-purpose persistent partial, but this
+file shows the Job largely stand-alone.
 
-Unlike a partial but like a Twisted deferred, the result of the wrapped
-call goes on the job's ``result`` attribute, and the immediate return of
-the call might not be the job's end result.  It could also be a failure,
-indicating an exception; or another partial, indicating that we are
-waiting to be called back by the second partial; or a twisted deferred,
-indicating that we are waiting to be called back by a twisted Deferred
-(see the ``zc.twist``).  After you have the partial, you can then use a
-number of methods and attributes on the partial for further set up. 
-Let's show the most basic use first, though.
+To use a job, simply wrap the callable--a method of a persistent object or a
+callable persistent object or a global function--in the job. You can include
+ordered and keyword arguments to the job, which may be persistent objects or
+simply pickleable objects.
 
-Note that, even though this looks like an interactive prompt, all
-functions and classes defined in this document act as if they were
-defined within a module.  Classes and functions defined in an interactive
-prompt are normally not picklable, and Jobs must work with
-picklable objects [#set_up]_.
+Unlike a partial but like a Twisted deferred, the result of the wrapped call
+goes on the job's ``result`` attribute. It could also be a failure, indicating
+an exception; or temporarily, None, indicating that we are waiting to be called
+back by a second Job or a twisted Deferred (see the ``zc.twist`` package).
 
+After you have the job, you can then use a number of methods and attributes
+on the partial for further set up. Let's show the most basic use first, though.
+
+(Note that, even though this looks like an interactive prompt, all functions and
+classes defined in this document act as if they were defined within a module.
+Classes and functions defined in an interactive prompt are normally not
+picklable, and Jobs must work with picklable objects. [#set_up]_.)
+
     >>> import zc.async.job
     >>> def call():
     ...     print 'hello world'
@@ -52,8 +52,8 @@
     >>> j.status == zc.async.interfaces.NEW
     True
 
-We can call the job from the NEW (or ASSIGNED, see later) status, and
-then see that the function was called, and see the result on the partial.
+We can call the job from the NEW (or PENDING or ASSIGNED, see later) status,
+and then see that the function was called, and see the result on the partial.
 
     >>> res = j()
     hello world
@@ -69,8 +69,8 @@
     'my result'
 
 In addition to using a global function, we can also use a method of a
-persistent object.  Imagine we have a ZODB root that we can put objects
-in to.
+persistent object. (For this example, imagine we have a ZODB root into which we
+can put objects.)
 
     >>> import persistent
     >>> class Demo(persistent.Persistent):
@@ -117,13 +117,13 @@
     ...
     exceptions.RuntimeError: Bad Things Happened Here
 
-Note that all calls can return a failure explicitly, rather than raising
-an exception that the job converts to an exception.  However, there
-is an important difference in behavior.  If a wrapped call raises an
-exception, the job aborts the transaction; but if the wrapped call
-returns a failure, no abort occurs.  Wrapped calls that explicitly return
-failures are thus responsible for any necessary transaction aborts.  See
-the footnote for an example [#explicit_failure_example]_.
+Note that all calls can return a failure explicitly, rather than raising an
+exception that the job converts to an exception. However, there is an important
+difference in behavior. If a wrapped call raises an exception, the job aborts
+the transaction; but if the wrapped call returns a failure, no automatic abort
+occurs. Wrapped calls that explicitly return failures are thus responsible for
+any necessary transaction aborts. See the footnote for an example
+[#explicit_failure_example]_.
 
 Now let's return a job from the job.  This generally represents a result
 that is waiting on another asynchronous persistent call, which would
@@ -236,23 +236,24 @@
 topic.
 
 Callbacks
----------
+=========
 
-The job object can also be used to handle return values and
-exceptions from the call.  The ``addCallbacks`` method enables the
-functionality.  Its signature is (success=None, failure=None).  It may
-be called multiple times, each time adding a success and/or failure
-callable that takes an end result: a value or a zc.async.Failure object,
-respectively.  Failure objects are passed to failure callables, and
-any other results are passed to success callables.
+The job object can also be used to handle return values and exceptions from the
+call. The ``addCallbacks`` method enables the functionality. Its signature is
+(success=None, failure=None). It may be called multiple times, each time adding
+a success and/or failure callable that takes the end result of the original,
+parent job: a value or a zc.async.Failure object, respectively. Failure objects
+are passed to failure callables, and any other results are passed to success
+callables.
 
-The return value of the success and failure callables is
-important for chains and for determining whether a job had any
-errors that need logging, as we'll see below.  The call to
-``addCallbacks`` returns a job, which can be used for chaining (see
-``Chaining Callbacks``_).
+Note that, unlike with Twisted deferred's, the results of callbacks for a given
+job are not chained.  To chain, add a callback to the desired callback.  The
+primary reason for a Twisted callback is try:except:else:finally logic.  In
+most cases, because zc.async jobs are long-running, the try:except logic can be
+accomplished within the code for the job itself.  For certain kinds of
+exceptions, an IRetryPolicy is used instead.
 
-Let's look at a simple example.
+Let's look at a simple example of a callback.
 
     >>> def call(*args):
     ...     res = 1
@@ -333,6 +334,7 @@
     failure. [Failure instance: Traceback: exceptions.TypeError...]
     also a failure. [Failure instance: Traceback: exceptions.TypeError...]
 
+------------------
 Chaining Callbacks
 ------------------
 
@@ -384,6 +386,7 @@
     >>> isinstance(j.result, twisted.python.failure.Failure)
     True
 
+--------------------------
 Callbacks on Completed Job
 --------------------------
 
@@ -403,6 +406,7 @@
     >>> j.status == zc.async.interfaces.COMPLETED
     True
 
+-------------
 Chaining Jobs
 -------------
 
@@ -422,33 +426,68 @@
     success! 15
     also a success! 60
 
-Failing
--------
+Failures
+========
 
-Speaking again of failures, it's worth discussing two other aspects of
-failing.  One is that jobs offer an explicit way to fail a call.  It
-can be called when the job has a NEW, PENDING, ASSIGNED or ACTIVE status. 
-The primary use cases for this method are to cancel a job that is
-overdue to start, and to cancel a job that was in progress by a
-worker thread in a dispatcher when the dispatcher died (more on that below).
+Let's talk more about failures.  We'll divide them up into a two large types.
 
+- The code you ran in the job failed.
+
+- The system around your code (zc.async code, the ZODB, or even the machine on
+  which the job is running) failed.
+
+If your code fails, it's mostly up to you to deal with it.  There are two
+mechanisms to use: callbacks, which we've already seen, and retry policies,
+which we'll look at below.
+
+If the system fails around your code, four sorts of things might happen. In the
+context of the full zc.async system, it is the responsibility of the zc.async
+code to react as described below, and, when pertinent, your reponsibility to
+make sure that the retry policy for the job does what you need.
+
+- The job never started, and timed out. zc.async should call ``fail`` on the
+  job, which aborts the job and begins callbacks.  (This is handled in the
+  queue's ``claim`` method, in the full context of zc.async.)
+
+- The job started but was unable to commit. Internally to the job's machinery,
+  the job uses a retry policy to decide what to do, as described below.
+
+- The job started but then was interrupted. zc.async should call
+  ``handleInterrupt``, in which a retry policy decides what to do, as described
+  below. (This is handled in a DispatcherAgents collection's ``deactivate``
+  method, in the full context of zc.async.)
+
+- The job completed, but the callbacks were interrupted. zc.async should call
+  ``resumeCallbacks``, which will handle the remaining callbacks in the manner
+  described here. (This is handled in a DispatcherAgents collection's
+  ``deactivate`` method, in the full context of zc.async.)
+
+We need to explore a few elements of this, then: the ``fail`` method, retry
+policies, job errors, commit errors, and the ``handleInterrupt`` method.
+
+--------
+``fail``
+--------
+
+The ``fail`` method is an explicit way to fail a job. It can be called when the
+job has a NEW, PENDING, or ASSIGNED status. The use case for this method is to
+cancel a job that is overdue to start.  It defaults to a TimeoutError.
+
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
     >>> transaction.commit()
     >>> j.fail()
     >>> print j.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    Traceback (most recent call last):
-    ...
-    zc.async.interfaces.AbortedError:
+    Traceback...zc.async.interfaces.TimeoutError...
 
-``fail`` calls all failure callbacks with the failure.
+``fail`` calls all callbacks with the failure.
 
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
     >>> j_callback = j.addCallbacks(failure=failure)
     >>> transaction.commit()
     >>> res = j.fail() # doctest: +ELLIPSIS
-    failure. [Failure instance: Traceback...zc.async.interfaces.AbortedError...]
+    failure. [Failure instance: Traceback...zc.async.interfaces.TimeoutError...]
 
-As seen above, it fails with zc.async.interfaces.AbortedError by default.
+As seen above, it fails with zc.async.interfaces.TimeoutError by default.
 You can also pass in a different error.
 
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
@@ -459,43 +498,856 @@
     ...
     exceptions.RuntimeError: failed
 
-As mentioned, if a dispatcher dies when working on an active task, the
-active task should be aborted using ``fail``, so the method also works if
-a job has the ACTIVE status.  We'll reach under the covers to show this.
+It won't work for failing tasks in ACTIVE or COMPLETED status.
 
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
     >>> j._status = zc.async.interfaces.ACTIVE
     >>> transaction.commit()
     >>> j.fail()
-    >>> print j.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     Traceback (most recent call last):
     ...
-    zc.async.interfaces.AbortedError:
+    BadStatusError: can only call fail on a job with NEW, PENDING, or ASSIGNED status
 
-It won't work for failing tasks in COMPLETED or CALLBACKS status.
-
+    >>> j._status = zc.async.interfaces.NEW
     >>> j.fail()
+    >>> j.status == zc.async.interfaces.COMPLETED
+    True
+    >>> j.fail()
     Traceback (most recent call last):
     ...
-    BadStatusError: can only call fail on a job with NEW, PENDING, ASSIGNED, or ACTIVE status
+    BadStatusError: can only call fail on a job with NEW, PENDING, or ASSIGNED status
+
+It's a dirty secret that it will work in CALLBACKS status.  This is because we
+need to support retries in the face of internal commits.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
     >>> j._status = zc.async.interfaces.CALLBACKS
+    >>> transaction.commit()
     >>> j.fail()
-    Traceback (most recent call last):
+    >>> j.status == zc.async.interfaces.COMPLETED
+    True
+    >>> j.result is None # the default
+    True
+
+--------------
+Retry Policies
+--------------
+
+All of the other failure situations touch on retry policies, directly or
+indirectly.
+
+What is a retry policy?  It is used in three circumstances.
+
+- When the code the job ran fails, the job will try to call
+  ``retry_policy.jobError(failure, data_cache)`` to get a boolean as to whether
+  the job should be retried.
+
+- When the commit fails, the job will try to call
+  ``retry_policy.commitError(failure, data_cache)`` to get a boolean as to
+  whether the job should be retried.
+
+- When the job starts but fails to complete because the system is interrupted,
+  the job will try to call ``retry_policy.interrupted()`` to get a boolean as
+  to whether the job should be retried.
+
+Why does this need to be a policy?  Can't it be a simpler arrangement?
+
+The heart of the problem is that different jobs need different error
+resolutions.
+
+In some cases, jobs may not be fully transactional.  For instance, the job
+may be communicating with an external system, such as a credit card system.
+The retry policy here should typically be "never": perhaps a callback should be
+in charge of determining what to do next.
+
+If a job is fully transactional, it can be retried.  But even then the desired
+behavior may differ.
+
+- In typical cases, some errors should simply cause a failure, while other
+  errors, such as database conflict errors, should cause a limited number of
+  retries.
+
+- In some jobs, conflict errors should be retried forever, because the job must
+  be run to completion or else the system should fall over. Callbacks that try
+  to handle errors themselves may take this approach, for instance.
+
+zc.async currently ships with three retry policies.
+
+1.  The default, appropriate for most fully transactional jobs, is the
+    zc.async.job.RetryCommonFourTimes.
+
+2.  The other available (pre-written) option for transactional jobs is
+    zc.async.job.RetryCommonForever. Callbacks will get this policy by
+    default.
+
+Both of these policies retry ZEO disconnects forever; and interrupts and
+transaction errors such as conflicts either four times (for a total of five
+attempts) or forever, respectively.
+
+3.  The last retry policy is zc.async.job.NeverRetry.  This is appropriate for
+    non-transactional jobs. You'll still typically need to handle errors in
+    your callbacks.
+
+Let's take a detailed look at these three in isolation before seeing them in
+practice below.
+
+We'll start with the default retry policy, RetryCommonFourTimes.  We can get it
+with ``getRetryPolicy``.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+    True
+    >>> verifyObject(zc.async.interfaces.IRetryPolicy, policy)
+    True
+
+Now we'll try out a few calls.  ``jobError`` and ``commitError`` both take a
+failure and a dictionary to stash data about the retry.  The job then calls
+the retry policy with the data dictionary before a commit using the
+``updateData`` method.
+
+Here's the policy requesting that a job be tried a total of five times,
+combined from in the job and in the commit
+
+    >>> import zc.twist
+    >>> import ZODB.POSException
+    >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+    >>> data = {}
+
+    >>> policy.jobError(conflict, data)
+    True
+    >>> policy.jobError(conflict, data)
+    True
+    >>> policy.jobError(conflict, data)
+    True
+    >>> policy.jobError(conflict, data)
+    True
+    >>> policy.jobError(conflict, data)
+    False
+
+Now we've expired the total number of retries for this kind of error, so during
+commit it will fail immediately.
+
+    >>> policy.commitError(conflict, data)
+    False
+
+We'll reset the data (which holds the information until ``updateData`` is
+called to store the information persistently, so that aborted transactions
+don't discard the history from past attempts) and try again.
+
+    >>> data = {}
+    >>> policy.commitError(conflict, data)
+    True
+    >>> policy.commitError(conflict, data)
+    True
+    >>> policy.commitError(conflict, data)
+    True
+    >>> policy.commitError(conflict, data)
+    True
+    >>> policy.commitError(conflict, data)
+    False
+
+A ZEO ClientDisconnected error will be retried forever.  We treat 50 as close
+enough to "forever".  It uses time.sleep to backoff.  We'll stub that so we can
+see what happens.
+
+    >>> import time
+    >>> sleep_requests = []
+    >>> def sleep(i):
+    ...     sleep_requests.append(i)
     ...
-    BadStatusError: can only call fail on a job with NEW, PENDING, ASSIGNED, or ACTIVE status
+    >>> old_sleep = time.sleep
+    >>> time.sleep = sleep
 
-Using ``resumeCallbacks``
--------------------------
+    >>> import ZEO.Exceptions
+    >>> disconnect = zc.twist.Failure(ZEO.Exceptions.ClientDisconnected())
 
-So ``fail`` is the proper way to handle an active job that was being
-worked on by on eof a dead dispatcher's worker thread, but how does one
-handle a job that was in the CALLBACKS status?  The answer is to use
-resumeCallbacks.  Any job that is still pending will be called; any
-job that is active will be failed; any job that is in the middle
-of calling its own callbacks will have its ``resumeCallbacks`` called; and
-any job that is completed will be ignored.
+    >>> for i in range(50):
+    ...     if not policy.jobError(disconnect, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+    >>> sleep_requests # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+    >>> len(sleep_requests)
+    50
 
+After jobError has upped the backoff, it keeps at 60 seconds for
+ClientDisconnected errors at commit.
+
+    >>> policy.commitError(disconnect, data)
+    True
+    >>> sleep_requests[50]
+    60
+
+Here's the backoff happening in commitError.
+
+    >>> del sleep_requests[:]
+    >>> data = {}
+    >>> for i in range(50):
+    ...     if not policy.commitError(disconnect, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+    >>> sleep_requests # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+    >>> len(sleep_requests)
+    50
+
+If we encounter another kind of error, no retries are requested.
+
+    >>> runtime = zc.twist.Failure(RuntimeError())
+    >>> policy.jobError(runtime, data)
+    False
+    >>> policy.commitError(runtime, data)
+    False
+    >>> value = zc.twist.Failure(ValueError())
+    >>> policy.jobError(value, data)
+    False
+    >>> policy.commitError(value, data)
+    False
+
+When the system is interrupted, ``handleInterrupt`` uses the retry_policy's
+``interrupted`` method to determine what to do.  It does not need to pass a
+data dictionary.  The default policy will retry ten times, for a total of ten
+attempts.
+
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    False
+
+``updateData`` is only used after ``jobError`` and ``commitError``, and is
+called every time there is a commit attempt, so we're wildly out of order here,
+but we'll call the method now anyway to show we can, and then perform the job.
+
+    >>> transaction.commit()
+    >>> policy.updateData(data)
+    >>> j()
+    10
+
+The policy may also want to perform additional logging.
+
+The other policies perform similarly. [#show_other_policies]_
+
+To change a policy on a job, you have two options. First, you can change the
+``retry_policy_factory`` on your instance.
+
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> j.retry_policy_factory = zc.async.job.NeverRetry
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.NeverRetry)
+    True
+
+Second, you can leverage the fact that the default policy tries to adapt the
+job to zc.async.interfaces.IRetryPolicy (with the default name of ''), and only
+if that fails does it use RetryCommonFourTimes.
+
+    >>> import zope.component
+    >>> zope.component.provideAdapter(
+    ...     zc.async.job.RetryCommonForever,
+    ...     provides=zc.async.interfaces.IRetryPolicy)
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonForever)
+    True
+
+    >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+    ...     zc.async.job.RetryCommonForever,
+    ...     provided=zc.async.interfaces.IRetryPolicy) # tearDown
+    True
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+    True
+
+If you are working with callbacks, the default retry policy is
+``RetryCommonForever``.
+
+    >>> def foo(result):
+    ...     print result
+    ...
+    >>> callback = j.addCallback(foo)
+    >>> policy = callback.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonForever)
+    True
+
+This can be changed in the same kind of way as a non-callback job.  You can
+set the ``retry_policy_factory``.
+
+    >>> callback = j.addCallback(foo)
+    >>> callback.retry_policy_factory = zc.async.job.NeverRetry
+    >>> policy = callback.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.NeverRetry)
+    True
+
+You can also register an adapter.  In this case it must be an adapter
+registered with the 'callback' name.
+
+    >>> zope.component.provideAdapter(
+    ...     zc.async.job.RetryCommonFourTimes,
+    ...     provides=zc.async.interfaces.IRetryPolicy, name='callback')
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> callback = j.addCallback(foo)
+    >>> policy = callback.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+    True
+
+    >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+    ...     zc.async.job.RetryCommonFourTimes,
+    ...     provided=zc.async.interfaces.IRetryPolicy, name='callback') # tearDown
+    True
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> callback = j.addCallback(foo)
+    >>> policy = callback.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonForever)
+    True
+
+Finally, it is worth noting that, typically, once a retry policy has been
+obtained with ``getRetryPolicy``, changing the factory or adapter registration
+will not change the policy: it has already been instantiated and stored on the
+job.
+
+    >>> callback.retry_policy_factory = zc.async.job.NeverRetry
+    >>> isinstance(callback.getRetryPolicy(), zc.async.job.NeverRetry)
+    False
+    >>> policy is callback.getRetryPolicy()
+    True
+
+Now we'll look at the standard retry policies in use as we examine more failure
+scenarios below.
+
+---------------------
+Failures in Your Code
+---------------------
+
+As seen above, failures in your code will generally be ignored by the default
+retry policy, and passed on as a Failure object on the job's request.  The
+only difference is for ConflictErrors (retry five times) and ClientDisconnected
+(retry forever).  Let's manufacture some of these problems.
+
+First, a few "normal" errors.  They just go to the result.
+
+    >>> count = 0
+    >>> max = 50
+    >>> def raiseAnError(klass):
+    ...     global count
+    ...     count += 1
+    ...     if count < max:
+    ...         raise klass()
+    ...     else:
+    ...         print 'tried %d times.  stopping.' % (count,)
+    ...         return 42
+    ...
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError, ValueError)
+    >>> transaction.commit()
+    >>> job()
+    <zc.twist.Failure exceptions.ValueError>
+    >>> job.result
+    <zc.twist.Failure exceptions.ValueError>
+    >>> count
+    1
+    >>> count = 0
+
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+    >>> transaction.commit()
+    >>> job()
+    <zc.twist.Failure exceptions.TypeError>
+    >>> job.result
+    <zc.twist.Failure exceptions.TypeError>
+    >>> count
+    1
+    >>> count = 0
+
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError, RuntimeError)
+    >>> transaction.commit()
+    >>> job()
+    <zc.twist.Failure exceptions.RuntimeError>
+    >>> job.result
+    <zc.twist.Failure exceptions.RuntimeError>
+    >>> count
+    1
+    >>> count = 0
+
+If we raise a ConflictError (or any TransactionError), that will try five
+times, and then be let through, so it will look very similar to the previous
+examples except for our ``count`` variable.
+
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+    ...                                    ZODB.POSException.ConflictError)
+    >>> transaction.commit()
+    >>> job()
+    <zc.twist.Failure ZODB.POSException.ConflictError>
+    >>> job.result
+    <zc.twist.Failure ZODB.POSException.ConflictError>
+    >>> count
+    5
+    >>> count = 0
+
+It's worth showing that, as you'd expect, if the job succeeds within the
+allotted retries, everything is fine.
+
+    >>> max = 3
+
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+    ...                                    ZODB.POSException.ConflictError)
+    >>> transaction.commit()
+    >>> job()
+    tried 3 times.  stopping.
+    42
+    >>> job.result
+    42
+    >>> count
+    3
+    >>> count = 0
+
+If we raise a ClientDisconnected error, that will retry forever, with a
+timeout.  our job doesn't let this happen, but the retry policy would.
+
+    >>> max = 50
+    >>> del sleep_requests[:]
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+    ...                                    ZEO.Exceptions.ClientDisconnected)
+    >>> transaction.commit()
+    >>> job()
+    tried 50 times.  stopping.
+    42
+    >>> count
+    50
+    >>> sleep_requests # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+    >>> len(sleep_requests)
+    49
+    >>> count = 0
+
+None of the provided retry policies utilize this functionality, but the
+pertinent retry policy method, ``jobError`` can not only return ``True``
+(retry now) or ``False`` (do not retry) but a datetime.datetime or a
+datetime.timedelta.  Datetimes and timedeltas indicate that the policy requests
+that the job be returned to the queue to be claimed again.  Let's give an
+example of this.
+
+As implied by this description, this functionality only makes sense within
+a fuller zc.async context.  We need a stub Agent as a job's parent, and a stub
+Queue in its ``parent`` lineage.
+
+    >>> class StubQueue(persistent.Persistent):
+    ...     zope.interface.implements(zc.async.interfaces.IQueue)
+    ...     def putBack(self, job):
+    ...         self.put_back = job
+    ...         job.parent = self
+    ...     def put(self, job, begin_after=None):
+    ...         self.put_job = job
+    ...         self.put_begin_after = begin_after
+    ...         job.parent = self
+    ...
+    >>> class StubAgent(persistent.Persistent):
+    ...     zope.interface.implements(zc.async.interfaces.IAgent)
+    ...     def __init__(self):
+    ...         # usually would have dispatcher agent parent, which would then
+    ...         # have queue as parent, but this is a stub
+    ...         self.parent = StubQueue()
+    ...     def jobCompleted(self, job):
+    ...         self.completed = job
+    ...     def remove(self, job):
+    ...         job.parent = None
+    ...         self.removed = job
+    ...
+
+We'll also need a quick stub retry policy that takes advantage of this
+functionality.
+
+    >>> import persistent
+    >>> import datetime
+    >>> class StubRescheduleRetryPolicy(persistent.Persistent):
+    ...     zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+    ...     _reply = datetime.timedelta(hours=1)
+    ...     def __init__(self, job):
+    ...         self.parent = self.__parent__ = job
+    ...     def jobError(self, failure, data_cache):
+    ...         return self._reply
+    ...     def commitError(self, failure, data_cache):
+    ...         return self._reply
+    ...     def interrupted(self):
+    ...         return self._reply
+    ...     def updateData(self, data_cache):
+    ...         pass
+    ...
+
+    >>> j = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+    >>> agent = j.parent = StubAgent()
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> transaction.commit()
+    >>> j is j()
+    True
+
+Notice above that, when this happens, the result of the call is the job itself.
+
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> agent.removed is j
+    True
+    >>> agent.parent.put_job is j
+    True
+    >>> agent.parent.put_begin_after
+    datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+    >>> import pytz
+    >>> j = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+    >>> agent = j.parent = StubAgent()
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+    >>> transaction.commit()
+    >>> j is j()
+    True
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> agent.removed is j
+    True
+    >>> agent.parent.put_job is j
+    True
+    >>> agent.parent.put_begin_after
+    datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+------------------
+Failures on Commit
+------------------
+
+Failures on commit are handled very similarly to those occurring while
+performing the job.
+
+We'll have to hack ``TransactionManager.commit`` to show this.
+
+    >>> from transaction import TransactionManager
+    >>> old_commit = TransactionManager.commit
+    >>> commit_count = 0
+    >>> error = None
+    >>> max = 2
+    >>> allow_commits = [1] # change state to active
+    >>> def new_commit(self):
+    ...     global commit_count
+    ...     commit_count += 1
+    ...     if commit_count in allow_commits:
+    ...         old_commit(self) # changing state to "active" or similar
+    ...     elif commit_count - len(allow_commits) < max:
+    ...         raise error()
+    ...     else:
+    ...         if commit_count - len(allow_commits) == max:
+    ...             print 'tried %d time(s).  committing.' % (
+    ...                 commit_count-len(allow_commits))
+    ...         old_commit(self)
+    ...
+    >>> TransactionManager.commit = new_commit
+    >>> count = 0
+    >>> def count_calls():
+    ...     global count
+    ...     count += 1
+    ...     return 42
+    ...
+
+Normal errors will simply pass through with only one try.
+
+    >>> error = ValueError
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 2 time(s).  committing.
+    <zc.twist.Failure exceptions.ValueError>
+    >>> count
+    1
+
+Note that, since the error at commit is obscuring the original result, the
+original result is logged.
+
+    >>> for r in reversed(event_logs.records):
+    ...     if r.levelname == 'INFO':
+    ...         break
+    ... else:
+    ...     assert False, 'could not find log'
+    ...
+    >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Commit failed ... Prior to this, job succeeded with result: 42
+
+This happens both for non-error results (as above) and for error results.
+
+    >>> count = commit_count = 0
+    >>> allow_commits = [1, 2, 3] # set to active, then get retry policy, then call
+    >>> j = root['j'] = zc.async.job.Job(raiseAnError, RuntimeError)
+    >>> transaction.commit()
+    >>> j()
+    tried 2 time(s).  committing.
+    <zc.twist.Failure exceptions.ValueError>
+    >>> found = []
+    >>> for r in reversed(event_logs.records):
+    ...     if r.levelname == 'ERROR':
+    ...         found.append(r)
+    ...         if len(found) == 2:
+    ...             break
+    ... else:
+    ...     assert False, 'could not find log'
+    ...
+    >>> print found[1].getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Commit failed ... Prior to this, job failed with traceback:
+    ...
+    exceptions.RuntimeError...
+
+The ValueError, from transaction time, is the main error recorded.
+
+    >>> print found[0].getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    <...Job...raiseAnError... failed with traceback:
+    ...
+    exceptions.ValueError...
+
+The conflict error will be tried five times.
+
+    >>> max = 6
+    >>> count = commit_count = 0
+    >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13] # set active state, then get retries
+    >>> error = ZODB.POSException.ConflictError
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 6 time(s).  committing.
+    <zc.twist.Failure ZODB.POSException.ConflictError>
+    >>> count
+    5
+
+If it succeeds in the number of tries allotted, the result will be fine.
+
+    >>> max = 3
+    >>> count = commit_count = 0
+    >>> allow_commits = [1, 3, 4, 6, 7]
+    >>> error = ZODB.POSException.ConflictError
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 3 time(s).  committing.
+    42
+    >>> count
+    3
+
+As an aside, the code regards getting the desired retry policy as a requirement
+and so will try until it does so.  Here's a repeat of the previous example,
+with some retries getting a failed commit.  Everything except a ConflictError
+has a backoff.
+
+    >>> max = 25
+    >>> count = commit_count = 0
+    >>> allow_commits = [1]
+    >>> error = ZODB.POSException.ConflictError
+    >>> del sleep_requests[:]
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 25 time(s).  committing.
+    42
+    >>> count
+    2
+    >>> sleep_requests
+    []
+
+The ClientDisconnected will be tried forever until it succeeds, with a backoff.
+We're putting in some irregular patterns of commits to test that these also
+work, which skew some of the results
+
+    >>> max = 50
+    >>> count = commit_count = 0
+    >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 22, 25]
+    >>> del sleep_requests[:]
+    >>> error = ZEO.Exceptions.ClientDisconnected
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 50 time(s).  committing.
+    42
+    >>> count
+    9
+    >>> len(sleep_requests)
+    51
+
+None of the provided retry policies utilize this functionality, but the
+pertinent retry policy method, ``jobError`` can not only return ``True``
+(retry now) or ``False`` (do not retry) but a datetime.datetime or a
+datetime.timedelta.  Datetimes and timedeltas indicate that the policy requests
+that the job be returned to the queue to be claimed again.  Let's give an
+example of this.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> agent = j.parent = StubAgent()
+    >>> StubRescheduleRetryPolicy._reply = datetime.timedelta(hours=1)
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> count = commit_count = 0
+    >>> max = 1
+    >>> error = ValueError
+    >>> transaction.commit()
+    >>> j is j()
+    True
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> agent.removed is j
+    True
+    >>> agent.parent.put_job is j
+    True
+    >>> agent.parent.put_begin_after
+    datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> agent = j.parent = StubAgent()
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+    >>> count = commit_count = 0
+    >>> transaction.commit()
+    >>> j is j()
+    True
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> agent.removed is j
+    True
+    >>> agent.parent.put_job is j
+    True
+    >>> agent.parent.put_begin_after
+    datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+All of this looks very similar to job errors. The only big difference between
+the behavior of job errors and commit errors is that a failure of a failure to
+commit *must* commit: it will try forever until it succeeds or is interrupted.
+Here's an example with a ConflictError.
+
+    >>> count = commit_count = 0
+    >>> max = 50
+    >>> error = ZODB.POSException.ConflictError
+    >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13]
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 50 time(s).  committing.
+    <zc.twist.Failure ZODB.POSException.ConflictError>
+    >>> count
+    5
+
+    >>> TransactionManager.commit = old_commit
+
+    >>> time.sleep = old_sleep # tearDown
+
+-------------------
+``handleInterrupt``
+-------------------
+
+Not infrequently, systems will be stopped while jobs are in the middle of their
+work.  When the clean-up occurs, the job needs to figure out what to do to
+handle the interruption.  The right thing to do is to call ``handleInterrupt``.
+This method itself defers to a retry policy to determine what to do.
+
+This method takes no arguments. The default behavior is to allow up to 10
+interruptions. It expects a parent agent to reschedule it in a queue.  Let's
+give an example.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> queue = j.parent = StubQueue()
+    >>> j._status = zc.async.interfaces.ACTIVE
+    >>> transaction.commit()
+    >>> j.handleInterrupt()
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> queue.put_back is j
+    True
+
+After 9 interruptions, the default policy will give up after the tenth try.
+
+    >>> for i in range(8):
+    ...     j.parent = agent
+    ...     j._status = zc.async.interfaces.ACTIVE
+    ...     j.handleInterrupt()
+    ...     if j.status != zc.async.interfaces.PENDING:
+    ...         print 'error', i, j.status
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+    >>> j.parent = agent
+    >>> j._status = zc.async.interfaces.ACTIVE
+    >>> j.handleInterrupt()
+    >>> j.status == zc.async.interfaces.COMPLETED
+    True
+    >>> j.result
+    <zc.twist.Failure zc.async.interfaces.AbortedError>
+
+If the retry policy returns True, as happens above, this should be interpreted
+by the agent and queue as a request to immediately reschedule the job. The
+policy can also return a datettime.timedelta or datetime.datetime to ask that
+it be rescheduled later in the future.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> queue = j.parent = StubQueue()
+    >>> StubRescheduleRetryPolicy._reply = datetime.timedelta(hours=1)
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> j._status = zc.async.interfaces.ACTIVE
+    >>> transaction.commit()
+    >>> j.handleInterrupt()
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> queue.put_job is j
+    True
+    >>> queue.put_begin_after
+    datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> queue = j.parent = StubQueue()
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+    >>> j._status = zc.async.interfaces.ACTIVE
+    >>> transaction.commit()
+    >>> j.handleInterrupt()
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> queue.put_job is j
+    True
+    >>> queue.put_begin_after
+    datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+In a full zc.async context, rescheduling is more of a dance between the job,
+the agent, and the queue.  See `catastrophes.txt` for more information.
+
+-------------------
+``resumeCallbacks``
+-------------------
+
+``handleInterrupt`` should be called when a job was working on its own code.
+But what happens if the system stops while a job was working on its callbacks?
+When the job is handled, the system should call ``resumeCallbacks``. The
+method will call any callback that is still pending, and not timed out because
+of ``begin_by``; it will call ``fail`` on any callback that is pending and
+timed out; it will call ``handleInterrupt`` on any callback that is active'; it
+will call ``resumeCallbacks`` on any callback that itself has the CALLBACKS
+status; and will ignore any job that is completed.  As such, it encompasses
+all of the retry behavior discussed above.  Moreover, while it does not use
+retry policies directly, indirectly they are often used.
+
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
     >>> j._result = 10
     >>> j._status = zc.async.interfaces.CALLBACKS
     >>> completed_j = zc.async.job.Job(multiply, 3)
@@ -517,11 +1369,8 @@
     80
     >>> sub_callbacks_j.status == zc.async.interfaces.COMPLETED
     True
-    >>> print active_j.result.getTraceback()
-    ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    Traceback (most recent call last):
-    ...
-    zc.async.interfaces.AbortedError:
+    >>> active_j.result # was retried because of default retry policy
+    50
     >>> active_j.status == zc.async.interfaces.COMPLETED
     True
     >>> pending_j.result
@@ -529,8 +1378,12 @@
     >>> pending_j.status == zc.async.interfaces.COMPLETED
     True
 
-[#aborted_active_callback_log]_
+[#retried_active_callback_log]_
 
+Introspection
+=============
+
+------------------------------------
 Introspecting and Mutating Arguments
 ------------------------------------
 
@@ -567,6 +1420,7 @@
     >>> res = j() # doctest: +ELLIPSIS
     <zc.async.job.Job (oid ...>
 
+-----------------
 Result and Status
 -----------------
 
@@ -673,6 +1527,7 @@
 - Also, a job's direct callback may not call the job
   [#callback_self]_.
 
+----------------------
 More Job Introspection
 ----------------------
 
@@ -769,6 +1624,8 @@
     True
     >>> transaction.abort()
 
+    >>> time.sleep = old_sleep # probably put in test tearDown
+
 =========
 Footnotes
 =========
@@ -783,15 +1640,17 @@
     >>> root = conn.root()
     >>> import zc.async.configure
     >>> zc.async.configure.base()
+    >>> import zc.async.testing
+    >>> zc.async.testing.setUpDatetime() # pins datetimes
 
 .. [#verify] Verify interface
 
     >>> from zope.interface.verify import verifyObject
     >>> verifyObject(zc.async.interfaces.IJob, j)
     True
-    
+
     Note that status and result are readonly.
-    
+
     >>> j.status = 1
     Traceback (most recent call last):
     ...
@@ -804,9 +1663,9 @@
 .. [#no_live_frames] Failures have two particularly dangerous bits: the
     traceback and the stack.  We use the __getstate__ code on Failures
     to clean them up.  This makes the traceback (``tb``) None...
-    
+
     >>> j.result.tb # None
-    
+
     ...and it makes all of the values in the stack--the locals and
     globals-- into strings.  The stack is a list of lists, in which each
     internal list represents a frame, and contains five elements: the
@@ -814,14 +1673,14 @@
     the line number (``f_lineno``), an items list of the locals, and an
     items list for the globals.  All of the values in the items list
     would normally be objects, but are now strings.
-    
+
     >>> for (codename, filename, lineno, local_i, global_i) in j.result.stack:
     ...     for k, v in local_i:
     ...         assert isinstance(v, basestring), 'bad local %s' % (v,)
     ...     for k, v in global_i:
     ...         assert isinstance(v, basestring), 'bad global %s' % (v,)
     ...
-    
+
     Here's a reasonable question.  The Twisted Failure code has a
     __getstate__ that cleans up the failure, and that's even what we are
     using to sanitize the failure.  If the failure is attached to a
@@ -845,7 +1704,7 @@
     desired changes to the transaction (such as aborting) before the
     job calls commit.  Compare.  Here is a call that raises an
     exception, and rolls back changes.
-    
+
     (Note that we are passing arguments to the job, a topic that has
     not yet been discussed in the text when this footnote is given: read
     on a bit in the main text to see the details, if it seems surprising
@@ -942,14 +1801,16 @@
     ...     assert False, 'could not find log'
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    <zc.async.job.Job (oid 114, db 'unnamed')
+    <zc.async.job.Job (oid ..., db 'unnamed')
      ``zc.async.job.completeStartedJobArguments(zc.async.job.Job
-                                                (oid 109, db 'unnamed'))``>
+                                                (oid ..., db 'unnamed'))``>
     succeeded with result:
     None
 
-    However, the callbacks that fail themselves are logged (again, as all jobs
-    are) at the "error" level and include a detailed traceback.
+    However, the callbacks that fail themselves are logged (by default at a
+    "CRITICAL" level because callbacks are often in charge of error handling,
+    and having an error in your error handler may be a serious problem) and
+    include a detailed traceback.
 
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 4)
     >>> transaction.commit()
@@ -957,8 +1818,8 @@
     >>> j() # doctest: +ELLIPSIS
     20
 
-    >>> for r in reversed(trace_logs.records):
-    ...     if r.levelname == 'ERROR':
+    >>> for r in reversed(event_logs.records):
+    ...     if r.levelname == 'CRITICAL':
     ...         break
     ... else:
     ...     assert False, 'could not find log'
@@ -966,29 +1827,204 @@
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     <...Job...``zc.async.doctest_test.multiply()``> failed with traceback:
     *--- Failure #... (pickled) ---
-    .../zc/async/job.py:...
+    ...job.py:...
      [ Locals...
-      res : 'None...
-     ( Globals...
-    .../zc/async/job.py:...
-     [ Locals...
       effective_args : '[20]...
      ( Globals...
     exceptions.TypeError: multiply() takes at least 2 arguments (1 given)
     *--- End of Failure #... ---
     <BLANKLINE>
 
-.. [#aborted_active_callback_log] The fact that the pseudo-aborted "active"
-    job failed is logged.
+.. [#show_other_policies]
 
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> j.retry_policy_factory = zc.async.job.RetryCommonForever
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonForever)
+    True
+
+    Here's the policy requesting that a job be tried "forever" (we show 50
+    times), both in the job and in the commit.
+
+    >>> import zc.twist
+    >>> import ZODB.POSException
+    >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+    >>> data = {}
+
+    >>> for i in range(50):
+    ...     if not policy.jobError(conflict, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+
+    >>> for i in range(50):
+    ...     if not policy.commitError(conflict, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+
+    A ZEO ClientDisconnected error will also be retried forever, but with a
+    backoff.
+
+    >>> import ZEO.Exceptions
+    >>> disconnect = zc.twist.Failure(ZEO.Exceptions.ClientDisconnected())
+    >>> del sleep_requests[:]
+
+    >>> for i in range(50):
+    ...     if not policy.jobError(disconnect, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+    >>> sleep_requests # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+    >>> len(sleep_requests)
+    50
+    >>> del sleep_requests[:]
+
+    >>> data = {}
+    >>> for i in range(50):
+    ...     if not policy.commitError(disconnect, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+    >>> sleep_requests # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+    >>> len(sleep_requests)
+    50
+
+    If we encounter another kind of error during the job, no retries are
+    requested.
+
+    >>> runtime = zc.twist.Failure(RuntimeError())
+    >>> policy.jobError(runtime, data)
+    False
+    >>> value = zc.twist.Failure(ValueError())
+    >>> policy.jobError(value, data)
+    False
+
+    However, during the commit, any kind of error will cause a retry, forever.
+
+    >>> for i in range(50):
+    ...     if not policy.commitError(runtime, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+
+    >>> for i in range(50):
+    ...     if not policy.commitError(value, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+
+    When the system is interrupted, ``handleInterrupt`` uses the retry_policy's
+    ``interrupted`` method to determine what to do.  It does not need to pass a
+    data dictionary.  This also happens forever.
+
+    >>> for i in range(50):
+    ...     if not policy.interrupted():
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+
+    ``updateData`` is only used after ``jobError`` and ``commitError``, and is
+    called every time there is a commit attempt, so we're wildly out of order here,
+    but we'll call the method now anyway to show we can, and then perform the job.
+
+    >>> transaction.commit()
+    >>> policy.updateData(data)
+    >>> j()
+    10
+
+Now we will show ``NeverRetry``. As the name implies, this is a simple policy
+for jobs that should never retry.  Typical reasons for this are that the job is
+talking to an external system or doing something else that is effectively not
+transactional.  More sophisticated policies for specific versions of this
+scenario may be possible; however, also consider callbacks for this usecase.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> j.retry_policy_factory = zc.async.job.NeverRetry
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.NeverRetry)
+    True
+
+    So, here's a bunch of "no"s.
+
+    >>> import zc.twist
+    >>> import ZODB.POSException
+    >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+    >>> data = {}
+
+    >>> policy.jobError(conflict, data)
+    False
+
+    >>> policy.commitError(conflict, data)
+    False
+
+    >>> policy.jobError(disconnect, data)
+    False
+
+    >>> policy.commitError(disconnect, data)
+    False
+
+    >>> runtime = zc.twist.Failure(RuntimeError())
+    >>> policy.jobError(runtime, data)
+    False
+
+    >>> policy.commitError(runtime, data)
+    False
+
+    >>> value = zc.twist.Failure(ValueError())
+    >>> policy.jobError(value, data)
+    False
+
+    >>> policy.commitError(value, data)
+    False
+
+    >>> policy.interrupted()
+    False
+
+    ``updateData`` is only used after ``jobError`` and ``commitError``, and is
+    called every time there is a commit attempt, so we're wildly out of order here,
+    but we'll call the method now anyway to show we can, and then perform the job.
+
+    >>> transaction.commit()
+    >>> policy.updateData(data)
+    >>> j()
+    10
+
+.. [#retried_active_callback_log] The fact that the pseudo-aborted "active"
+    job was retried is logged.
+
     >>> for r in reversed(trace_logs.records):
-    ...     if r.levelname == 'DEBUG' and r.getMessage().startswith('failing'):
+    ...     if (r.levelname == 'DEBUG' and
+    ...         r.getMessage().startswith('retrying interrupted')):
     ...         break
     ... else:
     ...     assert False, 'could not find log'
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    failing aborted callback
+    retrying interrupted callback
     <...Job...``zc.async.doctest_test.multiply(5)``> to <...Job...>
 
 .. [#call_self] Here's a job trying to call itself.

Modified: zc.async/trunk/src/zc/async/queue.py
===================================================================
--- zc.async/trunk/src/zc/async/queue.py	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/queue.py	2008-06-20 01:18:18 UTC (rev 87584)
@@ -13,6 +13,7 @@
 ##############################################################################
 import datetime
 import bisect
+import logging
 import pytz
 import persistent
 import persistent.interfaces
@@ -23,6 +24,7 @@
 import zope.component
 import zope.event
 import zope.bforest
+import zope.minmax
 import zc.queue
 import zc.dict
 
@@ -43,22 +45,20 @@
 
     UUID = None
     activated = None
-    
+
     def __init__(self, uuid):
         super(DispatcherAgents, self).__init__()
         self.UUID = uuid
-        self.__class__.last_ping.initialize(self)
-    
-    zc.async.utils.createAtom('last_ping', None)
-    
+        self.last_ping = zope.minmax.Maximum()
+
     ping_interval = datetime.timedelta(seconds=30)
     ping_death_interval = datetime.timedelta(seconds=60)
 
     @property
     def dead(self):
-        last_ping = self.last_ping
+        last_ping = self.last_ping.value
         if self.activated and (
-            self.last_ping is None or self.activated > self.last_ping):
+            last_ping is None or self.activated > last_ping):
             last_ping = self.activated
         elif last_ping is None:
             return False
@@ -103,27 +103,43 @@
             else:
                 while job is not None:
                     status = job.status
-                    if status == zc.async.interfaces.ASSIGNED:
+                    if status in (zc.async.interfaces.PENDING,
+                                  zc.async.interfaces.ASSIGNED):
+                        # odd
+                        zc.async.log.warning(
+                            'unexpected job status %s for %r; treating as NEW',
+                            status, job)
+                        status = zc.async.interfaces.NEW
+                    if status == zc.async.interfaces.NEW:
                         tmp = job.assignerUUID
                         job.assignerUUID = None
                         job.parent = None
                         queue.put(job)
                         job.assignerUUID = tmp
                     elif job.status == zc.async.interfaces.ACTIVE:
-                        queue.put(job.fail)
+                        j = queue.put(
+                            job.handleInterrupt,
+                            retry_policy_factory=zc.async.job.RetryCommonForever,
+                            failure_log_level=logging.CRITICAL)
                     elif job.status == zc.async.interfaces.CALLBACKS:
-                        queue.put(job.resumeCallbacks)
+                        j = queue.put(
+                            job.resumeCallbacks,
+                            retry_policy_factory=zc.async.job.RetryCommonForever,
+                            failure_log_level=logging.CRITICAL)
                     elif job.status == zc.async.interfaces.COMPLETED:
                         # huh, that's odd.
                         agent.completed.add(job)
+                        zc.async.utils.log.warning(
+                            'unexpectedly had to inform agent of completion '
+                            'of %r', job)
                     try:
                         job = agent.pull()
                     except IndexError:
                         job = None
         zope.event.notify(
             zc.async.interfaces.DispatcherDeactivated(self))
-        
 
+
 class Queues(zc.async.utils.Dict):
 
     def __setitem__(self, key, value):
@@ -161,9 +177,10 @@
         if not da.activated:
             raise ValueError('UUID is not activated.')
         now = datetime.datetime.now(pytz.UTC)
-        if (da.last_ping is None or
-            da.last_ping + da.ping_interval <= now):
-            da.last_ping = now
+        last_ping = da.last_ping.value
+        if (last_ping is None or
+            last_ping + da.ping_interval <= now):
+            da.last_ping.value = now
         next = self._getNextActiveSibling(uuid)
         if next is not None and next.dead:
             # `next` seems to be a dead dispatcher.
@@ -188,28 +205,37 @@
         self.size = size
 
     def clean(self):
+        now = datetime.datetime.now(pytz.UTC)
         for i, job in enumerate(reversed(self._data)):
-            if job.status in (
-                zc.async.interfaces.CALLBACKS,
-                zc.async.interfaces.COMPLETED):
+            status = job.status
+            if status in (zc.async.interfaces.CALLBACKS,
+                          zc.async.interfaces.COMPLETED) or (
+                status == zc.async.interfaces.PENDING and
+                job.begin_after > now): # for a rescheduled task
                 self._data.pull(-1-i)
 
     @property
     def filled(self):
         return len(self._data) >= self.size
 
+    def __contains__(self, item):
+        for i in self:
+            if i is item:
+                return True
+        return False
+
     def add(self, item):
+        if item in self:
+            return
         if not zc.async.interfaces.IJob.providedBy(item):
             raise ValueError('must be IJob')
         if self.name not in item.quota_names:
             raise ValueError('quota name must be in quota_names')
-        # self.clean()
         if self.filled:
             raise ValueError('Quota is filled')
         self._data.put(item)
 
-    for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__', 'get', 
-               '__contains__'):
+    for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__', 'get'):
         locals()[nm] = zc.async.utils.simpleWrapper(nm)
 
 
@@ -229,6 +255,8 @@
 class Queue(zc.async.utils.Base):
     zope.interface.implements(zc.async.interfaces.IQueue)
 
+    _putback_queue = None
+
     def __init__(self):
         self._queue = zc.queue.CompositeQueue()
         self._held = BTrees.OOBTree.OOBTree()
@@ -238,9 +266,14 @@
         self.dispatchers = Dispatchers()
         self.dispatchers.__parent__ = self
 
-    def put(self, item, begin_after=None, begin_by=None):
+    def put(self, item, begin_after=None, begin_by=None,
+            failure_log_level=None, retry_policy_factory=None):
         item = zc.async.interfaces.IJob(item)
-        if item.assignerUUID is not None:
+        if failure_log_level is not None:
+            item.failure_log_level = failure_log_level
+        if retry_policy_factory is not None:
+            item.retry_policy_factory = retry_policy_factory
+        if item.status != zc.async.interfaces.NEW:
             raise ValueError(
                 'cannot add already-assigned job')
         for name in item.quota_names:
@@ -253,10 +286,9 @@
             item.begin_after = now
         if begin_by is not None:
             item.begin_by = begin_by
-        elif item.begin_by is None:
-            item.begin_by = datetime.timedelta(hours=1) # good idea?
-        item.assignerUUID = zope.component.getUtility(
-            zc.async.interfaces.IUUID)
+        if item.assignerUUID is not None: # rescheduled job keeps old UUID
+            item.assignerUUID = zope.component.getUtility(
+                zc.async.interfaces.IUUID)
         if item._p_jar is None:
             # we need to do this if the job will be stored in another
             # database as well during this transaction.  Also, _held storage
@@ -274,7 +306,35 @@
         self._length.change(1)
         return item
 
+    def putBack(self, item):
+        # an agent has claimed a job, but now the job needs to be returned. the
+        # only current caller for this is a job's ``handleInterrupt`` method.
+        # The scenario for this is that the agent's dispatcher died while the
+        # job was active, interrupting the work; and the job's retry policy
+        # asks that the job be put back on the queue to be claimed immediately.
+        # This method puts the job in a special internal queue that ``_iter``
+        # looks at first. This allows jobs to maintain their order, if needed,
+        # within a quota.
+        assert zc.async.interfaces.IJob.providedBy(item)
+        assert item.status == zc.async.interfaces.NEW, item.status
+        assert item.begin_after is not None
+        assert item._p_jar is not None
+        # to support legacy instances of the queue that were created before
+        # this functionality and its separate internal data structure were
+        # part of the code, we instantiate the _putback_queue when we first
+        # need it, here.
+        if self._putback_queue is None:
+            self._putback_queue = zc.queue.CompositeQueue()
+        self._putback_queue.put(item)
+        item.parent = self
+        self._length.change(1)
+
     def _iter(self):
+        putback_queue = self._putback_queue
+        if putback_queue: # not None and not empty
+            dq_pop = putback_queue.pull
+            for dq_ix, dq_next in enumerate(putback_queue):
+                yield dq_pop, dq_ix, dq_next
         queue = self._queue
         tree = self._held
         q = enumerate(queue)
@@ -331,16 +391,17 @@
         if not self._length():
             return default
         uuid = None
+        quotas_cleaned = set()
         for pop, ix, job in self._iter():
             if job.begin_after > now:
                 break
             res = None
             quotas = []
-            if (job.begin_after + job.begin_by) < now:
-                res = zc.async.interfaces.IJob(
-                        job.fail) # specify TimeoutError?
+            if (job.begin_by is not None and
+                (job.begin_after + job.begin_by) < now):
+                res = zc.async.interfaces.IJob(job.fail)
+                res.args.append(zc.async.interfaces.TimeoutError())
                 res.begin_after = now
-                res.begin_by = datetime.timedelta(hours=1)
                 res.parent = self
                 if uuid is None:
                     uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
@@ -349,8 +410,10 @@
                 for name in job.quota_names:
                     quota = self.quotas.get(name)
                     if quota is not None:
-                        quota.clean()
-                        if quota.filled:
+                        if name not in quotas_cleaned:
+                            quota.clean()
+                            quotas_cleaned.add(name)
+                        if quota.filled and job not in quota:
                             break
                         quotas.append(quota)
                 else:

Modified: zc.async/trunk/src/zc/async/queue.txt
===================================================================
--- zc.async/trunk/src/zc/async/queue.txt	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/queue.txt	2008-06-20 01:18:18 UTC (rev 87584)
@@ -24,7 +24,7 @@
 [#queues_collection]_ As shown in the README.txt of this package (or see
 zc.async.adapters.defaultQueueAdapter), the queue with the name '' will
 typically be registered as an adapter to persistent objects that
-provides zc.async.interfaces.IQueue [#verify]_. 
+provides zc.async.interfaces.IQueue [#verify]_.
 
 The queue doesn't have any jobs yet.
 
@@ -82,25 +82,27 @@
     >>> job.parent is queue
     True
     >>> transaction.commit()
-    
+
 A job added without any special calls gets a `begin_after` attribute
 of now.
 
     >>> import datetime
     >>> import pytz
-    >>> now = datetime.datetime.now(pytz.UTC) 
+    >>> now = datetime.datetime.now(pytz.UTC)
     >>> now
     datetime.datetime(2006, 8, 10, 15, 44, 22, 211, tzinfo=<UTC>)
     >>> job.begin_after == now
     True
 
-A ``begin_by`` attribute is a duration, and defaults to one hour.  This
-means that it must be completed an hour after the ``begin_after`` datetime,
-or else the system will fail it.
+A ``begin_by`` attribute is a duration (datetime.timedelta) or None, and
+defaults to None.
 
-    >>> job.begin_by == datetime.timedelta(hours=1)
+    >>> job.begin_by == None
     True
 
+If ``begin_by`` were an hour, that would mean that it must be completed an hour
+after the ``begin_after`` datetime, or else the system will fail it.
+
 Now let's add a job to be performed later, using ``begin_after``.
 
 This means that it's immediately ready to be performed: we can ``claim`` it.
@@ -240,9 +242,9 @@
 - If begin_after + begin_by >= now, a job that makes the original job fail
   is used instead.
 
-- If a job has one or more ``quota_names`` and the associated quotas are
-  filled with jobs not in the CALLBACKS or COMPLETED status then it will
-  not be returned.
+- If a job is not failing and has one or more ``quota_names`` and the
+  associated quotas are filled with jobs not in the CALLBACKS or COMPLETED
+  status then it will not be returned.
 
 - It does not take an index argument.
 
@@ -391,22 +393,46 @@
     ...     @property
     ...     def queue(self):
     ...         return self.parent.parent
-    ...     def claimJob(self):
+    ...     def claimJob(self, filter=None):
     ...         if len(self) < self.size:
-    ...             job = self.queue.claim()
+    ...             job = self.queue.claim(filter=filter)
     ...             if job is not None:
     ...                 job.parent = self
     ...                 self.append(job)
     ...                 return job
-    ...     def pull(self):
-    ...         return self.pop(0)
+    ...     def pull(self, index=0):
+    ...         res = self.pop(index)
+    ...         res.parent = None
+    ...         return res
+    ...     def remove(self, item):
+    ...         self.pull(self.index(item))
     ...     def jobCompleted(self, job):
+    ...         persistent.list.PersistentList.remove(self, job)
     ...         self.completed.add(job)
-    ...         
+    ...     def reschedule(self, item, when):
+    ...         now = datetime.datetime.utcnow()
+    ...         if isinstance(when, datetime.datetime):
+    ...             self.remove(job)
+    ...             if when.tzinfo is not None:
+    ...                 when = when.astimezone(pytz.UTC).replace(tzinfo=None)
+    ...             if when <= now:
+    ...                 self.queue.disclaim(item)
+    ...             else:
+    ...                 self.queue.put(item, begin_after=when)
+    ...         elif isinstance(when, datetime.timedelta):
+    ...             self.remove(job)
+    ...             if when <= datetime.timedelta():
+    ...                 self.queue.disclaim(item)
+    ...             else:
+    ...                 self.queue.put(item, begin_after=now+when)
+    ...         else:
+    ...             raise TypeError('``when`` must be datetime or timedelta')
+    ...
 
     >>> job4 is queue.claim()
     True
     >>> job4.parent = StubAgent()
+    >>> job4.parent.append(job4)
     >>> job4.status == zc.async.interfaces.ASSIGNED
     True
     >>> list(quota) == [job4]
@@ -426,23 +452,25 @@
 timed out are returned in wrappers that fail the original job.
 
     >>> job9_from_outer_space = queue.put(mock_work)
+    >>> job9_from_outer_space.begin_by = datetime.timedelta(hours=1)
     >>> zc.async.testing.set_now(
     ...     datetime.datetime.now(pytz.UTC) +
     ...     job9_from_outer_space.begin_by + datetime.timedelta(seconds=1))
     >>> job9 = queue.claim()
     >>> job9 is job9_from_outer_space
     False
-    >>> stub = root['stub'] = StubAgent() 
+    >>> stub = root['stub'] = StubAgent()
     >>> job9.parent = stub
+    >>> stub.append(job9)
     >>> transaction.commit()
     >>> job9()
     >>> job9_from_outer_space.status == zc.async.interfaces.COMPLETED
     True
     >>> print job9_from_outer_space.result.getTraceback()
+    ... # doctest: +NORMALIZE_WHITESPACE
     Traceback (most recent call last):
-    Failure: zc.async.interfaces.AbortedError: 
+    Failure: zc.async.interfaces.TimeoutError:
     <BLANKLINE>
-    
 
 Dispatchers
 ===========
@@ -492,12 +520,12 @@
 
     >>> print da.activated
     None
-    >>> print da.last_ping
+    >>> print da.last_ping.value
     None
 
-When the object's ``last_ping`` + ``ping_interval`` is greater than now,
-a new ``last_ping`` should be recorded, as we'll see below.  If the
-``last_ping`` (or ``activated``, if more recent) +
+When the object's ``last_ping.value`` + ``ping_interval`` is greater than now,
+a new ``last_ping.value`` should be recorded, as we'll see below.  If the
+``last_ping.value`` (or ``activated``, if more recent) +
 ``ping_death_interval`` is older than now, the dispatcher is considered to
 be ``dead``.
 
@@ -514,7 +542,7 @@
     >>> import pytz
     >>> now = datetime.datetime.now(pytz.UTC)
     >>> da.activate()
-    >>> now <= da.activated <= datetime.datetime.now(pytz.UTC) 
+    >>> now <= da.activated <= datetime.datetime.now(pytz.UTC)
     True
 
 It's still not dead. :-)
@@ -638,10 +666,10 @@
 
     >>> before = datetime.datetime.now(pytz.UTC)
     >>> pollStub(conn)
-    >>> before <= da.last_ping <= datetime.datetime.now(pytz.UTC)
+    >>> before <= da.last_ping.value <= datetime.datetime.now(pytz.UTC)
     True
 
-We don't have any jobs to claim yet.  Let's add one and do it again. 
+We don't have any jobs to claim yet.  Let's add one and do it again.
 We'll use a test fixture, time_flies, to make the time change.
 
     >>> job10 = queue.put(mock_work)
@@ -652,10 +680,10 @@
     ...         datetime.timedelta(seconds=seconds))
     ...
 
-    >>> last_ping = da.last_ping
+    >>> last_ping = da.last_ping.value
     >>> time_flies(5)
     >>> pollStub(conn)
-    >>> da.last_ping == last_ping
+    >>> da.last_ping.value == last_ping
     True
 
     >>> len(jobs_to_do)
@@ -669,11 +697,11 @@
 
     >>> time_flies(10)
     >>> pollStub(conn)
-    >>> da.last_ping == last_ping
+    >>> da.last_ping.value == last_ping
     True
     >>> time_flies(15)
     >>> pollStub(conn)
-    >>> da.last_ping > last_ping
+    >>> da.last_ping.value > last_ping
     True
 
 Dead Dispatchers
@@ -684,7 +712,7 @@
 then proceed.  But what if a queue has more than one simultaneous
 dispatcher?  How do we know to clean out the dead dispatcher's jobs?
 
-The ``ping`` method not only changes the ``last_ping`` but checks the
+The ``ping`` method not only changes the ``last_ping.value`` but checks the
 next sibling dispatcher, as defined by UUID, to make sure that it is not
 dead. It uses the ``dead`` attribute, introduced above, to test whether
 the sibling is alive.
@@ -708,8 +736,8 @@
     False
 
 Let's do that again, with an agent in the new dispatcher and some jobs in the
-agent.  Assigned jobs will be reassigned; in-progress jobs will have a
-new task that fails them; callback jobs will resume their callback; and
+agent. Assigned jobs will be reassigned; in-progress jobs will have a new task
+that calls ``handleInterrupt``; callback jobs will resume their callback; and
 completed jobs will be moved to the completed collection.
 
     >>> alt_agent = alt_da['main'] = StubAgent()
@@ -755,16 +783,385 @@
     4
     >>> queue[1] is jobA
     True
-    >>> queue[2].callable == jobB.fail
+    >>> queue[2].callable == jobB.handleInterrupt
     True
     >>> queue[3].callable == jobC.resumeCallbacks
     True
     >>> alt_agent.completed.first() is jobD
     True
 
+As an aside, notice that the ``handleInterrupt`` and ``resumeCallbacks`` jobs
+have custom error log levels, and custom retry policies.
+
+    >>> import logging
+    >>> queue[2].failure_log_level == logging.CRITICAL
+    True
+    >>> queue[3].failure_log_level == logging.CRITICAL
+    True
+    >>> queue[2].retry_policy_factory is zc.async.job.RetryCommonForever
+    True
+    >>> queue[3].retry_policy_factory is zc.async.job.RetryCommonForever
+    True
+
 If you have multiple workers, it is strongly suggested that you get the
 associated servers connected to a shared time server.
 
+Rescheduled Jobs
+----------------
+
+When a job is interrupted--it was active when it stopped--it asks its retry
+policy whether to retry.  The policy can respond in one of three ways:
+
+- yes, please retry right now;
+
+- yes, please retry later; or
+
+- no, please do not retry, and fail instead.
+
+All three of these possibilities potentially affect the queue in different
+ways, so we will look at examples.
+
+First, let's look at the simple cases for all three.
+
+The scenario above lets us look at what is probably the most common case: the
+default retry policy.  Let's claim the ``handleInterrupt`` task.
+
+    >>> len(queue)
+    4
+    >>> j = alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'handleInterrupt')
+    ...
+    >>> j.callable == jobB.handleInterrupt
+    True
+    >>> len(queue)
+    3
+
+The default retry policy's behavior is to retry nine times, for a total of ten
+attempts, and to request that the original job be retried as soon as
+possible--first in line. That behavior will become more interesting when we
+look at interaction with quotas below. But now, performing this job will push
+jobB back in the queue--again, notably, in very first place in the queue.
+
+    >>> j()
+    >>> len(queue)
+    4
+    >>> queue[0] is jobB
+    True
+    >>> jobB is alt_agent.claimJob()
+    True
+    >>> jobB()
+    42
+    >>> len(queue)
+    3
+
+That was an example of the retry policy saying "please retry right now":
+returning ``True``. Now let's look at the other two possible responses.
+
+"please retry later": with a datetime.
+
+    >>> import persistent
+    >>> import datetime
+    >>> class StubRetryPolicy(persistent.Persistent):
+    ...     zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+    ...     _reply = datetime.datetime(3000, 1, 1)
+    ...     def __init__(self, job):
+    ...         self.parent = self.__parent__ = job
+    ...     def jobError(self, failure, data_cache):
+    ...         return self._reply
+    ...     def commitError(self, failure, data_cache):
+    ...         return self._reply
+    ...     def interrupted(self):
+    ...         return self._reply
+    ...     def updateData(self, data_cache):
+    ...         pass
+    ...
+
+    >>> j = zc.async.job.Job(mock_work)
+    >>> j._status = zc.async.interfaces.ACTIVE # hack internals for test
+    >>> j.retry_policy_factory = StubRetryPolicy
+    >>> j_wrap = queue.put(j.handleInterrupt)
+    >>> len(queue)
+    4
+    >>> j_wrap is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'handleInterrupt')
+    ...
+    True
+    >>> len(queue)
+    3
+    >>> j_wrap()
+    >>> len(queue)
+    4
+    >>> queue[3] is j
+    True
+    >>> j.begin_after
+    datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+...with a timedelta.
+
+    >>> j = zc.async.job.Job(mock_work)
+    >>> j._status = zc.async.interfaces.ACTIVE # hack internals for test
+    >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+    >>> j.retry_policy_factory = StubRetryPolicy
+    >>> j_wrap = queue.put(j.handleInterrupt)
+    >>> len(queue)
+    5
+    >>> j_wrap is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'handleInterrupt')
+    ...
+    True
+    >>> len(queue)
+    4
+    >>> j_wrap()
+    >>> len(queue)
+    5
+    >>> queue[3] is j
+    True
+    >>> j.begin_after
+    datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+"please do not retry": does not affect the queue, so we will not show it here.
+
+The trickiest aspects occur with quotas.
+
+"please retry now": with a quota.  In this case, the job should retain its
+precise place in the quota if the quota has a limit of one, and should remain
+in the quota if the quota has a greater limit.
+
+    >>> quota = queue.quotas['content catalog']
+    >>> quota[0]()
+    42
+    >>> quota.clean()
+    >>> len(quota)
+    0
+    >>> j = queue.put(mock_work)
+    >>> j.quota_names = ('content catalog',)
+    >>> j2 = queue.put(mock_work)
+    >>> j2.quota_names = ('content catalog',)
+    >>> transaction.commit()
+    >>> len(queue)
+    7
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            'content catalog' in candidate.quota_names)
+    ...
+    True
+    >>> list(quota) == [j]
+    True
+
+    >>> j._status = zc.async.interfaces.ACTIVE # fake beginning to call
+    >>> alt_agent.remove(j)
+    >>> j_wrap = queue.put(j.handleInterrupt)
+
+The ``j`` job still claims the space in the quota, so ``j2`` can't be
+claimed.
+
+    >>> print alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            'content catalog' in candidate.quota_names)
+    ...
+    None
+
+Now we'll get the ``handleInterrupt`` task.  The ``j`` job still cannot be
+removed from the quota.
+
+    >>> j_wrap is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'handleInterrupt')
+    ...
+    True
+    >>> len(quota)
+    1
+    >>> quota.clean()
+    >>> list(quota) == [j]
+    True
+
+Now we'll perform the task.  The ``j`` job returns to the queue in a
+``PENDING`` status, and the quota still cannot be cleared.
+
+    >>> j_wrap()
+    >>> len(queue)
+    7
+    >>> queue[0] is j
+    True
+    >>> len(quota)
+    1
+    >>> quota.clean()
+    >>> list(quota) == [j]
+    True
+
+The agent can claim job ``j`` again, however.
+
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            'content catalog' in candidate.quota_names)
+    ...
+    True
+
+Still cannot clear the quota.
+
+    >>> len(quota)
+    1
+    >>> quota.clean()
+    >>> list(quota) == [j]
+    True
+
+Once we perform j, the quota can be cleared, and j2 can be claimed, as usual.
+
+    >>> j()
+    42
+
+    >>> j2 is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            'content catalog' in candidate.quota_names)
+    ...
+    True
+
+"please retry later": with a quota.  In this case, other jobs can take the
+place of the original job in the quota.
+
+    >>> j3 = queue.put(mock_work)
+    >>> j3.quota_names = ('content catalog',)
+    >>> transaction.commit()
+    >>> j2.retry_policy_factory = StubRetryPolicy
+    >>> list(quota) == [j2]
+    True
+
+    >>> j2._status = zc.async.interfaces.ACTIVE # fake beginning to call
+    >>> alt_agent.remove(j2)
+    >>> j2_wrap = queue.put(j2.handleInterrupt)
+    >>> j2_wrap is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'handleInterrupt')
+    ...
+    True
+    >>> len(queue)
+    6
+    >>> j2_wrap()
+    >>> len(queue)
+    7
+    >>> queue[-2] is j2
+    True
+    >>> j2.status == zc.async.interfaces.PENDING
+    True
+
+    >>> j3 is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            'content catalog' in candidate.quota_names)
+    ...
+    True
+    >>> j3()
+    42
+
+A retry policy can also request resceduling from a jobError or a commitError,
+although the default retry policies do not.
+
+jobError: "please reschedule now"
+
+    >>> def raiseAnError():
+    ...     raise RuntimeError
+    ...
+    >>> StubRetryPolicy._reply = datetime.timedelta()
+    >>> j = queue.put(raiseAnError, retry_policy_factory=StubRetryPolicy)
+    >>> len(queue)
+    7
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'raiseAnError')
+    ...
+    True
+    >>> len(queue)
+    6
+    >>> j() is j
+    True
+    >>> len(queue)
+    7
+    >>> queue.claim() is j
+    True
+
+jobError: "please reschedule later"
+
+    >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+    >>> j = queue.put(raiseAnError, retry_policy_factory=StubRetryPolicy)
+    >>> len(queue)
+    7
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'raiseAnError')
+    ...
+    True
+    >>> len(queue)
+    6
+    >>> j() is j
+    True
+    >>> len(queue)
+    7
+    >>> queue[-2] is j
+    True
+    >>> j.begin_after
+    datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+commitError: "please reschedule now"
+
+    >>> error_flag = False
+    >>> old_commit = transaction.TransactionManager.commit
+    >>> def commit(self):
+    ...     global error_flag
+    ...     if error_flag:
+    ...         error_flag = False
+    ...         raise ValueError()
+    ...     old_commit(self)
+    ...
+    >>> transaction.TransactionManager.commit = commit
+    >>> def causeCommitError():
+    ...     global error_flag
+    ...     error_flag = True
+    ...
+    >>> StubRetryPolicy._reply = datetime.timedelta()
+    >>> j = queue.put(causeCommitError, retry_policy_factory=StubRetryPolicy)
+    >>> len(queue)
+    8
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'causeCommitError')
+    ...
+    True
+    >>> len(queue)
+    7
+    >>> j() is j
+    True
+    >>> len(queue)
+    8
+    >>> queue.claim() is j
+    True
+
+commitError: "please reschedule later"
+
+    >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+    >>> j = queue.put(causeCommitError, retry_policy_factory=StubRetryPolicy)
+    >>> len(queue)
+    8
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'causeCommitError')
+    ...
+    True
+    >>> len(queue)
+    7
+    >>> j() is j
+    True
+    >>> len(queue)
+    8
+    >>> queue[-2] is j
+    True
+    >>> j.begin_after
+    datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+
+    >>> transaction.TransactionManager.commit = old_commit
+
 =========
 Footnotes
 =========
@@ -801,12 +1198,12 @@
     Traceback (most recent call last):
     ...
     KeyError: 'foo'
-    
+
     >>> container['foo'] = None
     Traceback (most recent call last):
     ...
     ValueError: value must be IQueue
-    
+
     >>> del container['']
     >>> len(container)
     0

Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/testing.py	2008-06-20 01:18:18 UTC (rev 87584)
@@ -13,7 +13,9 @@
 ##############################################################################
 import threading
 import bisect
-import time
+from time import sleep as time_sleep # this import style is intentional, so
+# that test monkeypatching of time.sleep does not affect the usage in this
+# module
 import datetime
 
 import pytz
@@ -51,6 +53,9 @@
         raw = super(_datetime, self).__repr__()
         return "datetime.datetime%s" % (
             raw[raw.index('('):],)
+    def __add__(self, other):
+        return _datetime(
+            *super(_datetime,self).__add__(other).__reduce__()[1])
     def __reduce__(self):
         return (argh, super(_datetime, self).__reduce__()[1])
 def argh(*args, **kwargs):
@@ -103,7 +108,7 @@
         assert _when == 'before' and _event == 'shutdown', (
             'unsupported trigger')
         self.triggers.append((_when, _event, _callable, args, kwargs))
-    
+
     def callInThread(self, _callable, *args, **kw):
         # very naive should be fine...
         thread = threading.Thread(target=_callable, args=args, kwargs=kw)
@@ -177,7 +182,7 @@
                     break
             else:
                 break
-            time.sleep(0.1)
+            time_sleep(0.1)
         else:
             print 'TIME OUT'
 
@@ -189,7 +194,7 @@
     for i in range(seconds * 10):
         if len(dispatcher.polls) > count:
             return dispatcher.polls.first()
-        time.sleep(0.1)
+        time_sleep(0.1)
     else:
         assert False, 'no poll!'
 
@@ -198,7 +203,7 @@
         t = transaction.begin()
         if job.status == zc.async.interfaces.COMPLETED:
             return job.result
-        time.sleep(0.1)
+        time_sleep(0.1)
     else:
         assert False, 'job never completed'
 
@@ -208,6 +213,6 @@
         t = transaction.begin()
         if name in job.annotations:
             return job.annotations[name]
-        time.sleep(0.1)
+        time_sleep(0.1)
     else:
         assert False, 'annotation never found'

Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py	2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/utils.py	2008-06-20 01:18:18 UTC (rev 87584)
@@ -14,14 +14,27 @@
 import datetime
 import logging
 import sys
+import time
 
+import ZEO.Exceptions
+import ZODB.POSException
 import rwproperty
 import persistent
+import zope.minmax
 import zc.dict
 import pytz
 import zope.bforest.periodic
 
 
+EXPLOSIVE_ERRORS = (SystemExit, KeyboardInterrupt)
+
+SYSTEM_ERRORS = (ZEO.Exceptions.ClientDisconnected,
+                 ZODB.POSException.POSKeyError)
+
+INITIAL_BACKOFF = 5
+MAX_BACKOFF = 60
+BACKOFF_INCREMENT = 5
+
 def simpleWrapper(name):
     # notice use of "simple" in function name!  A sure sign of trouble!
     def wrapper(self, *args, **kwargs):
@@ -47,39 +60,12 @@
     def __parent__(self, value):
         self._z_parent__ = None
 
+# for legacy databases
+Atom = zope.minmax.Maximum
 
-class Atom(persistent.Persistent):
-    def __init__(self, value):
-        self.value = value
 
-    def __getstate__(self):
-        return self.value
+class Dict(zc.dict.Dict, Base):
 
-    def __setstate__(self, state):
-        self.value = state
-
-class AtomDescriptor(object):
-    def __init__(self, name, initial=None):
-        self.name = name
-        self.initial = initial
-
-    def __get__(self, obj, klass=None):
-        if obj is None:
-            return self
-        return obj.__dict__[self.name].value
-
-    def __set__(self, obj, value):
-        obj.__dict__[self.name].value = value
-
-    def initialize(self, obj):
-        obj.__dict__[self.name] = Atom(self.initial)
-
-def createAtom(name, initial):
-    sys._getframe(1).f_locals[name] = AtomDescriptor(name, initial)
-
-
-class Dict(zc.dict.Dict, Base):
-    
     copy = None # mask
 
     def __setitem__(self, key, value):
@@ -236,3 +222,114 @@
 
     def __len__(self):
         return len(self._data)
+
+def never_fail(call, identifier, tm):
+    # forever for TransactionErrors; forever, with backoff, for anything else
+    trans_ct = 0
+    backoff_ct = 0
+    backoff = INITIAL_BACKOFF
+    res = None
+    while 1:
+        try:
+            res = call()
+            tm.commit()
+        except ZODB.POSException.TransactionError:
+            tm.abort()
+            trans_ct += 1
+            if not trans_ct % 5:
+                log.warning(
+                    '%d consecutive transaction errors while %s',
+                    trans_ct, identifier, exc_info=True)
+                res = None
+        except EXPLOSIVE_ERRORS:
+            tm.abort()
+            raise
+        except Exception, e:
+            if isinstance(e, SYSTEM_ERRORS):
+                level = logging.ERROR
+            else:
+                level = logging.CRITICAL
+            tm.abort()
+            backoff_ct += 1
+            if backoff_ct == 1:
+                # import pdb; pdb.set_trace()
+                log.log(level,
+                        'first error while %s; will continue in %d seconds',
+                        identifier, backoff, exc_info=True)
+            elif not backoff_ct % 10:
+                log.log(level,
+                        '%d consecutive errors while %s; '
+                        'will continue in %d seconds',
+                        backoff_ct, identifier, backoff, exc_info=True)
+            res = None
+            time.sleep(backoff)
+            backoff = min(MAX_BACKOFF, backoff + BACKOFF_INCREMENT)
+        else:
+            return res
+
+def wait_for_system_recovery(call, identifier, tm):
+    # forever for TransactionErrors; forever, with backoff, for SYSTEM_ERRORS
+    trans_ct = 0
+    backoff_ct = 0
+    backoff = INITIAL_BACKOFF
+    res = None
+    while 1:
+        try:
+            res = call()
+            tm.commit()
+        except ZODB.POSException.TransactionError:
+            tm.abort()
+            trans_ct += 1
+            if not trans_ct % 5:
+                log.warning(
+                    '%d consecutive transaction errors while %s',
+                    ct, identifier, exc_info=True)
+                res = None
+        except EXPLOSIVE_ERRORS:
+            tm.abort()
+            raise
+        except SYSTEM_ERRORS:
+            tm.abort()
+            backoff_ct += 1
+            if backoff_ct == 1:
+                log.error('first error while %s; will continue in %d seconds',
+                          identifier, backoff, exc_info=True)
+            elif not backoff_ct % 5:
+
+                log.error('%d consecutive errors while %s; '
+                          'will continue in %d seconds',
+                          backoff_ct, identifier, backoff, exc_info=True)
+            res = None
+            time.sleep(backoff)
+            backoff = min(MAX_BACKOFF, backoff + BACKOFF_INCREMENT)
+        except:
+            log.error('Error while %s', identifier, exc_info=True)
+            tm.abort()
+            return zc.twist.Failure()
+        else:
+            return res
+
+def try_transaction_five_times(call, identifier, tm):
+    ct = 0
+    res = None
+    while 1:
+        try:
+            res = call()
+            tm.commit()
+        except ZODB.POSException.TransactionError:
+            tm.abort()
+            ct += 1
+            if ct >= 5:
+                log.error('Five consecutive transaction errors while %s',
+                          identifier, exc_info=True)
+                res = zc.twist.Failure()
+            else:
+                continue
+        except EXPLOSIVE_ERRORS:
+            tm.abort()
+            raise
+        except:
+            tm.abort()
+            log.error('Error while %s', identifier, exc_info=True)
+            res = zc.twist.Failure()
+        return res



More information about the Checkins mailing list