[Checkins] SVN: zc.async/branches/dev/s checkpoint; close to release, just needs some clean-up

Gary Poster gary at zope.com
Mon Jun 16 22:38:08 EDT 2008


Log message for revision 87444:
  checkpoint; close to release, just needs some clean-up

Changed:
  U   zc.async/branches/dev/setup.py
  U   zc.async/branches/dev/src/zc/async/CHANGES.txt
  U   zc.async/branches/dev/src/zc/async/README.txt
  U   zc.async/branches/dev/src/zc/async/TODO.txt
  U   zc.async/branches/dev/src/zc/async/catastrophes.txt
  U   zc.async/branches/dev/src/zc/async/configure.py
  U   zc.async/branches/dev/src/zc/async/dispatcher.py
  U   zc.async/branches/dev/src/zc/async/dispatcher.txt
  U   zc.async/branches/dev/src/zc/async/interfaces.py
  U   zc.async/branches/dev/src/zc/async/job.py
  U   zc.async/branches/dev/src/zc/async/job.txt
  U   zc.async/branches/dev/src/zc/async/queue.py
  U   zc.async/branches/dev/src/zc/async/queue.txt
  U   zc.async/branches/dev/src/zc/async/testing.py
  U   zc.async/branches/dev/src/zc/async/utils.py

-=-
Modified: zc.async/branches/dev/setup.py
===================================================================
--- zc.async/branches/dev/setup.py	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/setup.py	2008-06-17 02:38:06 UTC (rev 87444)
@@ -18,7 +18,6 @@
 try:
     import docutils
 except ImportError:
-    import warnings
     def validateReST(text):
         return ''
 else:
@@ -97,13 +96,14 @@
         'zc.dict>=1.2.1',
         'zc.twist>=1.2',
         'Twisted>=8.0.1', # 8.0 was setuptools compatible, 8.0.1 had bugfixes.
-        # note that Twisted builds with warnings, at least with py2.4.  It
+        # note that Twisted builds with warnings with py2.4.  It
         # seems to still build ok.
         'zope.bforest>=1.2',
         'zope.component',
         'zope.event',
         'zope.i18nmessageid',
         'zope.interface',
+        'zope.minmax',
         'zope.testing',
         'rwproperty',
         ],

Modified: zc.async/branches/dev/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/CHANGES.txt	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/CHANGES.txt	2008-06-17 02:38:06 UTC (rev 87444)
@@ -1,6 +1,57 @@
 1.2 (unreleased)
 ================
 
+- made the log for finding an activated agent report the pertinent queue's oid
+  as an unpacked integer, rather than the packed string blob.  As mentioned
+  above, use ``ZODB.utils.p64`` to convert back to an oid that the ZODB will
+  recognize.
+
+- Bugfix: in failing a job, the job thought it was in its old agent, and the
+  ``fail`` call failed. This is now tested by the first example in new doctest
+  ``catastrophes.txt``.
+
+- jobs no longer default to a ``begin_by`` value of one hour after the
+  ``begin_after``.  The default now is no limit.
+
+- Made dispatcher much more robust to transaction errors and ZEO
+  ClientDisconnected errors.
+
+- Jobs now use an IRetryPolicy to decide what to do on failure within a job,
+  within the commit of the result, and if the job is interrupted.  This allows
+  support of transactional jobs, transactional jobs that critically must be
+  run to completion, and non-transactional jobs such as communicating with an
+  external service.
+
+- The default retry policy supports retries for ClientDisconnected errors,
+  transaction errors, and interruptions.
+
+- ``job.txt`` has been expanded significantly to show error handling and the
+  use of retry policies. New file ``catastrophes.txt`` shows handling of other
+  catastrophes, such as interruptions to polling.
+
+- job errors now go in the main zc.async.event log rather than in the
+  zc.async.trace log.  Successes continue to go in the trace log.
+
+- callback failures go to the main log as a CRITICAL error, by default.
+
+- ``handleInterrupt`` is the new protocol on jobs to inform them that they were
+  active in a dispatcher that is now dead. They either fail or reschedule,
+  depending on the associated IRetryPolicy for the job. If they reschedule,
+  this should either be a datetime or timedelta. The job calls the agent's
+  ``reschedule`` method. If the timedelta is empty or negative, or the datetime
+  is earlier than now, the job is put back in the queue with a new ``putBack``
+  method on the queue. This is intended to be the opposite of ``claim``. Jobs
+  put in the queue with ``putBack`` will be pulled out before any others.
+
+- convert to using zope.minmax rather than locally defined ``Atom``.
+
+- Fix (and simplify) last_ping code so as to reduce unnecessarily writing the
+  state of the parent DispatcherAgents collection to the database whenever the
+  atom changed.
+
+1.1.1 (2008-05-14)
+==================
+
 - more README tweaks.
 
 - converted all reports from the dispatcher, including the monitor output,
@@ -21,15 +72,6 @@
   Fixed, which also means we need a new version of zope.bforest (1.2) for a new
   feature there.
 
-- made the log for finding an activated agent report the pertinent queue's oid
-  as an unpacked integer, rather than the packed string blob.  As mentioned
-  above, use ``ZODB.utils.p64`` to convert back to an oid that the ZODB will
-  recognize.
-
-- Bugfix: in failing a job, the job thought it was in its old agent, and the
-  ``fail`` call failed. This is now tested by the first example in new doctest
-  ``catastrophes.txt``.
-
 1.1 (2008-04-24)
 ================
 
@@ -49,7 +91,7 @@
 
 - Had the ThreadedDispatcherInstaller subscriber stash the thread on the
   dispatcher, so you can shut down tests like this:
-  
+
   >>> import zc.async.dispatcher
   >>> dispatcher = zc.async.dispatcher.get()
   >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)

Modified: zc.async/branches/dev/src/zc/async/README.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README.txt	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/README.txt	2008-06-17 02:38:06 UTC (rev 87444)
@@ -1087,15 +1087,16 @@
 
     >>> t = transaction.begin()
     >>> job = queue.put(
-    ...     send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
+    ...     send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC),
+    ...     datetime.timedelta(hours=1))
     >>> transaction.commit()
     >>> reactor.wait_for(job)
     >>> job.result
-    <zc.twist.Failure zc.async.interfaces.AbortedError>
+    <zc.twist.Failure zc.async.interfaces.TimeoutError>
     >>> import sys
     >>> job.result.printTraceback(sys.stdout) # doctest: +NORMALIZE_WHITESPACE
     Traceback (most recent call last):
-    Failure: zc.async.interfaces.AbortedError:
+    Failure: zc.async.interfaces.TimeoutError:
 
 .. [#job] The Job class can take arguments and keyword arguments
     for the wrapped callable at call time as well, similar to Python

Modified: zc.async/branches/dev/src/zc/async/TODO.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/TODO.txt	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/TODO.txt	2008-06-17 02:38:06 UTC (rev 87444)
@@ -1,65 +1,10 @@
 Bugs and improvements:
 
-- need retry tasks, particularly for callbacks. ``retry_count`` affects aborts
-  and transaction failures?  None == infinity, which is what callbacks use?
-  Should retry have a cleanup function?
-
-- need CRITICAL logs for callbacks
-
-- when database went away, and then came back, async didn't come back.  Fix
-  (and also reconsider retry behavior in Dispatcher._commit and
-  AgentThreadPool.perform_thread).
-
-Traceback (most recent call last):
-  File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/dispatcher.py", line 321, in _commit
-    transaction.commit()
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_manager.py", line 93, in commit
-    return self.get().commit()
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 325, in commit
-    self._commitResources()
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 422, in _commitResources
-    rm.tpc_begin(self)
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/Connection.py", line 525, in tpc_begin
-    self._normal_storage.tpc_begin(transaction)
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 1079, in tpc_begin
-    self._server.tpc_begin(id(txn), txn.user, txn.description,
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 86, in __getattr__
-    raise ClientDisconnected()
-ClientDisconnected
-
-Exception in thread Thread-5:
-Traceback (most recent call last):
-  File "/opt/cleanpython24/lib/python2.4/threading.py", line 442, in __bootstrap
-    self.run()
-  File "/opt/cleanpython24/lib/python2.4/threading.py", line 422, in run
-    self.__target(*self.__args, **self.__kwargs)
-  File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/dispatcher.py", line 153, in perform_thread
-    job() # this does the committing and retrying, largely
-  File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 290, in __call__
-    return self._call_with_retry(
-  File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 321, in _call_with_retry
-    res = self._complete(zc.twist.Failure(), tm)
-  File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 340, in _complete
-    tm.commit()
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_manager.py", line 93, in commit
-    return self.get().commit()
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 325, in commit
-    self._commitResources()
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 422, in _commitResources
-    rm.tpc_begin(self)
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/Connection.py", line 525, in tpc_begin
-    self._normal_storage.tpc_begin(transaction)
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 1079, in tpc_begin
-    self._server.tpc_begin(id(txn), txn.user, txn.description,
-  File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 86, in __getattr__
-    raise ClientDisconnected()
-ClientDisconnected
-
+- need to tell zc.twist.Partial for callbacks to retry forever: currently no
+  spelling for this in zc.twist.  This makes using deferreds less reliable:
+  _callback *must* be called.
 - be even more pessimistic about memory for saved polls and job info in
   dispatcher.
-
-For future versions:
-
 - queues should be pluggable like agent with filter
 - show how to broadcast, maybe add conveniences
 - show how to use with collapsing jobs (hint to future self: use external queue
@@ -85,10 +30,17 @@
     Notice that all of the tests in this package use FileStorage.
   * callbacks should be very, very quick, and very reliable.  If you want to do
     something that might take a while, put another job in the queue
-  
 
+More docs:
+
+- custom retry policies, particularly for non-transactional tasks;
+
+- changing the default retry policy, per-process and per-agent; and
+
+- changing the default log level for queue jobs, callback jobs, and per-job.
+
+
 For some other package, maybe:
 
 - TTW Management and logging views, as in zasync (see goals in the "History"
   section of the README).
-  
\ No newline at end of file

Modified: zc.async/branches/dev/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/catastrophes.txt	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/catastrophes.txt	2008-06-17 02:38:06 UTC (rev 87444)
@@ -21,10 +21,9 @@
 predictable, and if we knew of a bug, we'd try to fix it, rather than discuss
 it here!
 
-Exploring polling exceptions and job related exceptions will illuminate the
-more specific catastrophes you may encounter, and how your code and zc.async's
-can work together to handle them.  We'll discuss each, then drill down into
-some specific scenarios.
+We'll discuss both polling exceptions and job related exceptions, then drill
+down into some specific scenarios. This will illuminate how your code and
+zc.async's can work together to handle them.
 
 Polling Exceptions
 ------------------
@@ -117,7 +116,16 @@
 
 As discussed elsewhere, if your job fails in your own code, this is mostly
 your reponsibility. You should handle possible errors both within your job's
-code, and in callbacks, as appropriate. zc.async's responsibilities are merely
+code, and in callbacks, as appropriate.
+
+The other tool at your disposal for this situation, as with others below, is a
+retry policy. Retry policies let you determine what zc.async should do when
+your job fails. The default retry policy for job failures (as well as commit
+failures, below) is that transaction errors, such as conflict errors, are
+retried five times, and a ZEO ClientDisconnected error is retried forever with
+a backoff. You can customize these.
+
+Other than supporting these tools, zc.async's only other responsibilities are
 to report.
 
 By default, zc.async will log a failure of a job entered in a queue at the
@@ -129,11 +137,8 @@
 
 zc.async also includes a ``Failure`` object on the job as a result, to let you
 react to the problem in a callback, and analyze it later.  This is discussed in
-detail elsewhere.
+detail in other documents.
 
-The RetryPolicy, discussed later, can try to react to exceptions from the job's
-code, but the default policies included in the package do not.
-
 Process Ends
 ............
 
@@ -156,8 +161,8 @@
 fallout.
 
 As we'll see below, zc.async defaults to guessing that jobs placed directly in
-a queue are transactional, and can be restarted up to ten times; and that jobs
-used as callbacks are also transactional, and should be restarted until they
+a queue are transactional, and can be tried up to ten times; and that jobs
+used as callbacks are also transactional, and can be tried until they
 succeed.  The defaults can be changed and the behavior of an individual
 job can be changed.
 
@@ -171,24 +176,22 @@
 ClientDisconnected errors should often cause jobs to be aborted and restarted.
 However, if the job is not transactional, such as communicating with an
 external service, a simple abourt and retry may be hazardous. Also, many jobs
-should be stopped if they retry on ConflictError more than some heuristic
-bellweather, with the logic that they may be simply doing something too
-problematic, and they are blocking other tasks from starting. But other jobs
-should be retried until they complete.
+should be stopped if they retry on ConflictError more than some number of
+times--a heuristic bellweather--with the logic that they may be simply doing
+something too problematic, and they are blocking other tasks from starting. But
+other jobs should be retried until they complete.
 
 As mentioned above, zc.async defaults to guessing that jobs are transactional.
-Client Disconnected errors are retried forever.  Jobs placed in a queue retry
-ConflictErrors five times, while callbacks retry them forever, with a small
-backoff.  The defaults can be changed and the behavior of an individual
-job can be changed, using the RetryPolicy described below.
+Client Disconnected errors are retried forever, with a small backoff. Jobs
+placed in a queue retry transaction errors, such as ConflictErrors, four times,
+while callbacks retry them forever. The defaults can be changed and the
+behavior of an individual job can be changed, using the RetryPolicy described
+below.
 
-While custom RetryPolicies can try to handle other transaction errors, they are
-generally considered to be out of zc.async's domain and control.
-
 Summary of Job-Related Exceptions
 .................................
 
-If an exception handles in your job's code, zc.async will log it as an ERROR
+If an exception occurs in your job's code, zc.async will log it as an ERROR
 if a main queue job and as CRITICAL if it is a callback; and it will make the
 result of the call a ``Failure`` with error information, as shown elsewhere.
 Everything else is your responsibility, to be handled with try:except or
@@ -221,28 +224,64 @@
 --------------
 
 The rest of the document uses scenarios to illustrate how zc.async handles
-errors, and how you might want to configure retry policies.  Retry policies are
-given a job, and then exception information, and then can determine whether
-zc.async should retry the job. zc.async comes with three retry policies.
+errors, and how you might want to configure retry policies.
 
-- One is the absence of a retry policy: do not retry this job.  A value of
-  ``None`` represents this policy.  This is appropriate for tasks that are
-  not transactional.  They typically need to be handled with a callback.
+What is a retry policy?  It is used in three circumstances.
 
-- The default is a retry policy that retries after a restart up to four times,
-  retries a ConflictError up to four times, and retries a ClientDisconnected up
-  to four times.
+- When the job starts but fails to complete because the system is interrupted,
+  the job will try to call ``retry_policy.interrupted()`` to get a boolean as
+  to whether the job should be retried.
 
-- One is a retry policy that never gives up on restarts, ConflictErrors, or
-  ClientDisconnected errors: it has no limit but keeps trying forever.
+- When the code the job ran fails, the job will try to call
+  ``retry_policy.jobError(failure, data_cache)`` to get a boolean as to whether
+  the job should be retried.
 
+- When the commit fails, the job will try to call
+  ``retry_policy.commitError(failure, data_cache)`` to get a boolean as to
+  whether the job should be retried.
+
+Why does this need to be a policy?  Can't it be a simpler arrangement?
+
+The heart of the problem is that different jobs need different error
+resolutions.
+
+In some cases, jobs may not be fully transactional.  For instance, the job
+may be communicating with an external system, such as a credit card system.
+The retry policy here should typically be "never": perhaps a callback should be
+in charge of determining what to do next.
+
+If a job is fully transactional, it can be retried.  But even then the desired
+behavior may differ.
+
+- In typical cases, some errors should simply cause a failure, while other
+  errors, such as database conflict errors, should cause a limited number of
+  retries.
+
+- In some jobs, conflict errors should be retried forever, because the job must
+  be run to completion or else the system should fall over. Callbacks that try
+  to handle errors themselves may take this approach, for instance.
+
+zc.async currently ships with three retry policies.
+
+1.  The default, appropriate for most fully transactional jobs, is the
+    zc.async.job.RetryCommonFourTimes.
+
+2.  The other available (pre-written) option for transactional jobs is
+    zc.async.job.RetryCommonForever. Callbacks will get this policy by
+    default.
+
+Both of these policies retry ZEO disconnects forever; and interrupts and
+transaction errors such as conflicts either four times (for a total of five
+attempts) or forever, respectively.
+
+3.  The last retry policy is zc.async.job.NeverRetry.  This is appropriate for
+    non-transactional jobs. You'll still typically need to handle errors in
+    your callbacks.
+
 Scenarios
 ---------
 
-We'll examine a number of scenarios, many of which combine problems in polling
-and in jobs.  In these scenarios, we'll refer to a job that was placed directly
-in a queue as a "queue job".  A job that was performed in any other way is a
-"callback job," because most often these are callbacks.
+We'll examine polling error scenarios and job error scenarios.
 
 - Polling errors
 
@@ -250,79 +289,22 @@
 
   * The system is polling and gets a ClientDisconnected error.
 
-- Internal job errors
+- Job errors
 
-  * A worker process is polling and working on a queue job. The queue job fails
-    internally.
-
-  * A worker process is polling and working on a callback job.  The callback job
-    fails internally.
-
-- Default retry policy
-
-  * A worker process is working on a job with the default retry policy and gets
-    a ConflictError during the commit.
-  
-  * A worker process is working on a job with the default retry policy and gets
-    a ClientDisconnected error.
-  
   * A worker process is working on a job with the default retry policy. The
     process dies gracefully and restarts.
-  
+
   * Like the previous scenario, a worker process is working on a job with the
     default retry policy. The process crashes hard (does not die gracefully)
     and restarts.
-  
+
   * Like the previous scenario, a worker process is working on a job with the
     default retry policy. The process crashes hard (does not die gracefully)
     and a sibling notices and takes over.
 
-- No retry policy
+  * A worker process is working on a job with the default retry policy and gets
+    an error during the job or the commit.
 
-  * A worker process is working on a job without a retry policy (that is,
-    zc.async should not retry it) and gets a ConflictError during the commit.
-  
-  * A worker process is working on a job without a retry policy (that is,
-    zc.async should not retry it) and gets a ClientDisconnected error.
-  
-  * A worker process is working on a job with no retry policy (that is,
-    zc.async should not retry it). The process dies gracefully and restarts.
-  
-  * Like the previous scenario, a worker process is working on a job with no
-    retry policy (that is, zc.async should not retry it). The process crashes
-    hard (does not die gracefully) and restarts.
-  
-  * Like the previous scenario, a worker process is working on a job with no
-    retry policy (that is, zc.async should not retry it). The process crashes
-    hard (does not die gracefully) and a sibling notices and takes over.
-
-- Retry forever policy
-
-  * A worker process is working on a job with the retry-forever policy and gets
-    a ConflictError during the commit.
-  
-  * A worker process is working on a job with the retry-forever policy and gets
-    a ClientDisconnected error.
-  
-  * A worker process is working on a job with the retry-forever policy. The
-    process dies gracefully and restarts.
-  
-  * Like the previous scenario, a worker process is working on a job with the
-    retry-forever policy. The process crashes hard (does not die gracefully)
-    and restarts.
-  
-  * Like the previous scenario, a worker process is working on a job with the
-    retry-forever policy. The process crashes hard (does not die gracefully)
-    and a sibling notices and takes over.
-
-We will close with customizations:
-
-- custom retry policies, particularly for non-transactional tasks;
-
-- changing the default retry policy, per-process and per-agent; and
-
-- changing the default log level for queue jobs, callback jobs, and per-job.
-
 -------------------------
 Scenarios: Polling Errors
 -------------------------
@@ -343,7 +325,8 @@
     >>> lock1.acquire()
     True
     >>> lock2.acquire()
-    >>> def acquireLockAndchooseFirst(agent):
+    True
+    >>> def acquireLockAndChooseFirst(agent):
     ...     res = agent.queue.claim()
     ...     if res is not None:
     ...         lock2.release()
@@ -352,13 +335,23 @@
     ...
     >>> import zc.async.instanceuuid
     >>> import zc.async.interfaces
+    >>> import zc.async.testing
+    >>> import zc.async.dispatcher
+    >>> import pprint
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> pprint.pprint(zc.async.testing.get_poll(dispatcher, 0))
+    {'': {'main': {'active jobs': [],
+                   'error': None,
+                   'len': 0,
+                   'new jobs': [],
+                   'size': 3}}}
     >>> import transaction
     >>> _ = transaction.begin()
     >>> queues = root[zc.async.interfaces.KEY]
     >>> queue = queues['']
     >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
     >>> agent = da['main']
-    >>> agent.chooser = acquireLockAndchooseFirst
+    >>> agent.chooser = acquireLockAndChooseFirst
     >>> def returnSomething():
     ...     return 42
     ...
@@ -371,6 +364,7 @@
 same job.
 
     >>> lock2.acquire()
+    True
     >>> _ = transaction.begin()
     >>> job is queue.pull()
     True
@@ -386,7 +380,12 @@
     >>> import zc.async.dispatcher
     >>> dispatcher = zc.async.dispatcher.get()
     >>> import zc.async.testing
-    >>> zc.async.testing.get_poll(dispatcher)
+    >>> pprint.pprint(zc.async.testing.get_poll(dispatcher))
+    {'': {'main': {'active jobs': [],
+                   'error': None,
+                   'len': 0,
+                   'new jobs': [],
+                   'size': 3}}}
 
 And if we put the job back, it will be performed.
 
@@ -408,29 +407,49 @@
     >>> lock2.locked()
     True
 
+    >>> agent.chooser = acquireLockAndChooseFirst
     >>> job = queue.put(returnSomething)
     >>> transaction.commit()
 
     >>> lock2.acquire()
+    True
     >>> import ZEO.Exceptions
     >>> def commit(self):
     ...     raise ZEO.Exceptions.ClientDisconnected()
     ...
     >>> import transaction
-    >>> old_commit = transaction.TranasctionManager.commit
-    >>> transaction.TranasctionManager.commit = commit
+    >>> old_commit = transaction.TransactionManager.commit
+    >>> transaction.TransactionManager.commit = commit
+    >>> import time
+    >>> sleep_requests = []
+    >>> def sleep(i):
+    ...     sleep_requests.append(i)
+    ...
+    >>> old_sleep = time.sleep
+    >>> time.sleep = sleep
+    >>> agent.chooser = zc.async.agent.chooseFirst
+    >>> transaction.commit()
     >>> lock1.release()
-    >>> zc.async.testing.get_poll(dispatcher)
-    >>> transaction.TranasctionManager.commit = old_commit
+    >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
+    {'': {'main': {'active jobs': [],
+                   'error': None,
+                   'len': 0,
+                   'new jobs': [(..., 'unnamed')],
+                   'size': 3}}}
+    >>> transaction.TransactionManager.commit = old_commit
     >>> zc.async.testing.wait_for_result(job)
     42
+    >>> bool(sleep_requests)
+    True
 
 Here's another variant that mimics being unable to read the storage during a
-poll, and then recouperating.
+poll, and then recuperating.
 
-    >>> error_raised = 0
+    >>> error_raised = False
     >>> def raiseDisconnectedThenChooseFirst(agent):
+    ...     global error_raised
     ...     if not error_raised:
+    ...         error_raised = True
     ...         raise ZEO.Exceptions.ClientDisconnected()
     ...     return agent.queue.claim()
     >>> agent.chooser = raiseDisconnectedThenChooseFirst
@@ -439,99 +458,395 @@
     ...
     >>> job = queue.put(returnSomething)
     >>> transaction.commit()
-    >>> zc.async.testing.get_poll(dispatcher)
+    >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
+    {'': {'main': {'active jobs': [],
+                   'error': <zc.twist.Failure ...ClientDisconnected>,
+                   'len': 0,
+                   'new jobs': [],
+                   'size': 3}}}
     >>> zc.async.testing.wait_for_result(job)
     42
 
-------------------------------
-Scenarios: Internal Job Errors
-------------------------------
+-----------------------------
+Scenarios: Job-Related Errors
+-----------------------------
 
-Queue Job
----------
+Graceful Shutdown During Job
+----------------------------
 
-Callback Job
-------------
+First let's consider how a failed job with a callback or two is handled when
+the dispatcher dies.
 
--------------------------------
-Scenarios: Default Retry Policy
--------------------------------
+Here we start a job.
 
-ConflictError
--------------
+    >>> import zope.component
+    >>> import transaction
+    >>> import zc.async.interfaces
+    >>> import zc.async.testing
+    >>> import zc.async.dispatcher
 
-ClientDisconnected
-------------------
+    >>> queue = root[zc.async.interfaces.KEY]['']
+    >>> lock = threading.Lock()
+    >>> lock.acquire()
+    True
+    >>> fail_flag = True
+    >>> def wait_for_me():
+    ...     global fail_flag
+    ...     if fail_flag:
+    ...         fail_flag = False
+    ...         lock.acquire()
+    ...         lock.release() # so we can use the same lock again later
+    ...         raise SystemExit() # this will cause the worker thread to exit
+    ...     else:
+    ...         return 42
+    ...
+    >>> def handle_result(result):
+    ...     return 'I got result %r' % (result,)
+    ...
+    >>> job = queue.put(wait_for_me)
+    >>> callback_job = job.addCallback(handle_result)
+    >>> transaction.commit()
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> wait_for_start(job)
 
-Graceful Shutdown
------------------
+In this scenario, ``wait_for_me`` is a job that, the first time it is run, will
+"unexpectedly" be lost while the dispatcher stops working. ``handle_result``
+will simply show us that callbacks will be called successfully.
 
-Hard Crash
-----------
+The job has started. Now, the dispatcher suddenly dies without the thread
+performing ``wait_for_me`` getting a chance to finish. For our first example,
+let's give the dispatcher a graceful exit. The dispatcher gets a chance to
+clean up its dispatcher agents, and job.handleInterrupt() goes into the queue.
 
-Hard Crash and Sibling
-----------------------
+    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+    >>> wait_to_deactivate(dispatcher)
+    >>> _ = transaction.begin()
+    >>> job.status == zc.async.interfaces.ACTIVE
+    True
+    >>> len(queue)
+    1
+    >>> interrupt_job = queue[0]
+    >>> interrupt_job # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+    >>> queue[0].callable # doctest: +ELLIPSIS
+    <bound method Job.handleInterrupt of <...Job ... ``...wait_for_me()``>>
 
---------------------------
-Scenarios: No Retry Policy
---------------------------
+Now when the process starts back up again, ``handleInterrupt`` checks with the
+default retry policy as to what should be done. It requests that the job be
+retried. It's put back in the queue, and it is called again normally.
 
-ConflictError
--------------
+    >>> old_dispatcher = dispatcher
+    >>> zc.async.dispatcher.clear()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...         poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> zc.async.testing.wait_for_result(interrupt_job)
 
-ClientDisconnected
-------------------
+Now we need to wait for the job.
 
-Graceful Shutdown
------------------
+    >>> zc.async.testing.wait_for_result(job)
+    42
+    >>> callback_job.status == zc.async.interfaces.COMPLETED
+    True
+    >>> callback_job.result
+    'I got result 42'
 
-Hard Crash
-----------
+The job now has a retry policy with some currently non-interface values that
+are still worth showing here.
 
-Hard Crash and Sibling
-----------------------
+    >>> policy = job.getRetryPolicy()
+    >>> policy.data.get('interruptions')
+    1
 
--------------------------------
-Scenarios: Retry-Forever Policy
--------------------------------
+This shows that the policy registered one interruption. [#cleanup1]_
 
-ConflictError
--------------
+Hard Crash During Job
+---------------------
 
-ClientDisconnected
-------------------
+Our next catastrophe only changes one aspect to the previous one: the
+dispatcher does not stop gracefully, and does not have a chance to clean up its
+active jobs.  It is a "hard" crash.
 
-Graceful Shutdown
------------------
+To show this, we will start a job, simulate the dispatcher dying "hard," and
+restart it so it clean up.
 
-Hard Crash
-----------
+So, first we start a long-running job in the dispatcher.
 
-Hard Crash and Sibling
-----------------------
+    >>> lock.acquire()
+    True
+    >>> fail_flag = True
+    >>> job = queue.put(wait_for_me)
+    >>> callback_job = job.addCallback(handle_result)
+    >>> transaction.commit()
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> wait_for_start(job)
 
---------------
-Customizations
---------------
+Now we'll "crash" the dispatcher.
 
-Changing the Default Retry Policy
----------------------------------
+    >>> dispatcher.activated = False # this will make polling stop, without
+    ...                              # cleanup
+    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
+    >>> dispatcher.thread.join(3)
 
-Creating a New Retry Policy
----------------------------
+Hard crashes can be detected because the dispatchers write datetimes to the
+database every few polls. A given dispatcher instance does this for each queue
+on a ``DispatcherAgents`` object available in ``queue.dispatchers[UUID]``,
+where ``UUID`` is the uuid of that dispatcher.
 
-Changing the Log Level
-----------------------
+The ``DispatcherAgents`` object has four pertinent attributes:
+``ping_interval``, ``ping_death_interval``, ``last_ping.value``, and ``dead``.
+About every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is
+supposed to write a ``datetime`` to ``last_ping.value``. If the
+``last_ping.value`` plus the ``ping_death_interval`` (also a ``timedelta``) is
+older than now, the dispatcher is considered to be ``dead``, and old jobs
+should be cleaned up.
 
+The ``ping_interval`` defaults to 30 seconds, and the ``ping_death_interval``
+defaults to 60 seconds. Generally, the ``ping_death_interval`` should be at
+least two or three poll intervals (``zc.async.dispatcher.get().poll_interval``)
+greater than the ``ping_interval``.
 
+The ping hasn't timed out yet, so the dispatcher isn't considered dead yet.
 
+    >>> _ = transaction.begin()
+    >>> import zc.async.instanceuuid
+    >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+    >>> da.ping_death_interval
+    datetime.timedelta(0, 60)
+    >>> da.ping_interval
+    datetime.timedelta(0, 30)
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    False
 
+Therefore, the job is still sitting around in the dispatcher's pile in the
+database (the ``main`` key is for the ``main`` agent installed in this
+dispatcher in the set up for these examples).
 
+    >>> job in da['main']
+    True
+    >>> job.status == zc.async.interfaces.ACTIVE
+    True
 
+Let's start our dispatcher up again.
 
+    >>> old_dispatcher = dispatcher
+    >>> zc.async.dispatcher.clear()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...         poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
+    >>> dispatcher = zc.async.dispatcher.get()
 
+Initially, it's going to be a bit confused, because it sees that the
+DispatcherAgents object is ``activated``, and not ``dead``. It can't tell if
+there's another process using its same UUID, or if it is looking at the result
+of a hard crash.
 
+    >>> zc.async.testing.wait_for_result(job, seconds=1)
+    Traceback (most recent call last):
+    ...
+    AssertionError: job never completed
+    >>> zc.async.testing.get_poll(dispatcher, seconds=1)
+    {'': None}
+    >>> for r in reversed(event_logs.records):
+    ...     if r.levelname == 'ERROR':
+    ...         break
+    ... else:
+    ...     assert False, 'did not find log'
+    ...
+    >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    UUID ... already activated in queue  (oid 4): another process?
+    (To stop poll attempts in this process, set
+    ``zc.async.dispatcher.get().activated = False``.  To stop polls
+    permanently, don't start a zc.async.dispatcher!)
 
+To speed up the realization of our dispatcher that the previous activation is
+``dead``, we'll set the ping_death_interval to just one second.
+
+    >>> _ = transaction.begin()
+    >>> da.dead
+    False
+    >>> job in da['main']
+    True
+    >>> len(queue)
+    0
+    >>> import datetime
+    >>> da.ping_death_interval = datetime.timedelta(seconds=1)
+    >>> da.dead
+    True
+    >>> bool(da.activated)
+    True
+    >>> transaction.commit()
+
+After the next poll, the dispatcher will have cleaned up its old tasks in the
+same way we saw in the previous example. The job's ``handleInterrupt`` method
+will be called, and the job will be put back in the queue to be retried. The
+DispatcherAgents object is no longer dead, because it is tied to the new
+instance of the dispatcher.
+
+    >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> def wait_for_pending(job):
+    ...     for i in range(60):
+    ...         t = transaction.begin()
+    ...         if job.status in (zc.async.interfaces.PENDING):
+    ...             break
+    ...         time_sleep(0.1)
+    ...     else:
+    ...         assert False, 'job never pending'
+    >>> wait_for_pending(job)
+    >>> job in da['main']
+    False
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    False
+    >>> queue[0] is job
+    True
+
+Now we need to wait for the job.
+
+    >>> zc.async.testing.wait_for_result(job)
+    42
+    >>> callback_job.status == zc.async.interfaces.COMPLETED
+    True
+    >>> callback_job.result
+    'I got result 42'
+    >>> policy = job.getRetryPolicy()
+    >>> policy.data.get('interruptions')
+    1
+
+The dispatcher cleaned up its own "hard" crash.
+
+[#cleanup2]_
+
+Hard Crash During Job with Sibling Recovery
+-------------------------------------------
+
+Our next catastrophe is the same as the one before, except, after one
+dispatcher's hard crash, another dispatcher is around to clean up the dead
+jobs.
+
+To show this, we will start a job, start a second dispatcher, simulate the
+first dispatcher dying "hard," and watch the second dispatcher clean up
+after the first one.
+
+So, first we start a long-running job in the dispatcher as before.
+
+    >>> lock.acquire()
+    True
+    >>> fail_flag = True
+    >>> job = queue.put(wait_for_me)
+    >>> callback_job = job.addCallback(handle_result)
+    >>> transaction.commit()
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> wait_for_start(job)
+
+Now we'll start up an alternate dispatcher.
+
+    >>> import uuid
+    >>> alt_uuid = uuid.uuid1()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...     poll_interval=0.5, uuid=alt_uuid)(
+    ...     zc.async.interfaces.DatabaseOpened(db))
+    >>> alt_dispatcher = zc.async.dispatcher.get(alt_uuid)
+
+We're also going to set the main dispatcher's ``ping_death_interval`` back to
+60 seconds so we can see some polls in the alternate dispatcher before it gets
+around to cleaning up.
+
+    >>> da.ping_death_interval = datetime.timedelta(seconds=60)
+    >>> transaction.commit()
+
+Now we'll "crash" the dispatcher.
+
+    >>> dispatcher.activated = False # this will make polling stop, without
+    ...                              # cleanup
+    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
+    >>> dispatcher.thread.join(3)
+
+As discussed in the previous example, the polling hasn't timed out yet, so the
+alternate dispatcher can't know that the first one is dead. Therefore, the job
+is still sitting around in the old dispatcher's pile in the database.
+
+    >>> _ = transaction.begin()
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    False
+    >>> job.status == zc.async.interfaces.ACTIVE
+    True
+    >>> alt_poll_1 = zc.async.testing.get_poll(alt_dispatcher)
+    >>> _ = transaction.begin()
+    >>> job in da['main']
+    True
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    False
+    >>> alt_poll_2 = zc.async.testing.get_poll(alt_dispatcher)
+    >>> _ = transaction.begin()
+    >>> job in da['main']
+    True
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    False
+
+Above, the ping_death_interval was returned to the default of 60 seconds. To
+speed up the realization of our second dispatcher that the first one is dead,
+we'll set the ping_death_interval back down to just one second.
+
+    >>> da.ping_death_interval
+    datetime.timedelta(0, 60)
+    >>> import datetime
+    >>> da.ping_death_interval = datetime.timedelta(seconds=1)
+    >>> da.dead
+    True
+    >>> bool(da.activated)
+    True
+    >>> transaction.commit()
+
+After the second dispatcher gets a poll--a chance to notice--it will have
+cleaned up the first dispatcher's old tasks in the same way we saw in the
+previous example.  The job's ``handleInterrupt`` method will be called, which
+in this case will put it back in the queue to be claimed and performed.
+
+    >>> alt_poll_3 = zc.async.testing.get_poll(alt_dispatcher)
+    >>> _ = transaction.begin()
+    >>> job in da['main']
+    False
+    >>> bool(da.activated)
+    False
+    >>> da.dead
+    True
+    >>> wait_for_pending(job)
+    >>> queue[0] is job
+    True
+
+Now we need to wait for the job.
+
+    >>> zc.async.testing.wait_for_result(job)
+    42
+    >>> callback_job.status == zc.async.interfaces.COMPLETED
+    True
+    >>> callback_job.result
+    'I got result 42'
+
+The sibling, then, was able to clean up the mess left by the "hard" crash of
+the first dispatcher.
+
+[#cleanup3]_
+
+Other Job-Related Errors
+------------------------
+
+Other problems--errors when performing or committing jobs--are handled within
+jobs, getting the decisions from retry policies as described above.  These
+are demonstrated in the job.txt document.
+
 .. ......... ..
 .. Footnotes ..
 .. ......... ..
@@ -541,8 +856,8 @@
     >>> import ZODB.FileStorage
     >>> storage = ZODB.FileStorage.FileStorage(
     ...     'main.fs', create=True)
-    >>> from ZODB.DB import DB 
-    >>> db = DB(storage) 
+    >>> from ZODB.DB import DB
+    >>> db = DB(storage)
     >>> conn = db.open()
     >>> root = conn.root()
     >>> import zc.async.configure
@@ -560,13 +875,14 @@
     >>> import transaction
     >>> _ = transaction.begin()
 
-    >>> import time
+    >>> from time import sleep as time_sleep # we import in this manner so
+    ... # that our testing monkey-patch of time.sleep does not affect our tests
     >>> def wait_for_start(job):
     ...     for i in range(60):
     ...         t = transaction.begin()
     ...         if job.status == zc.async.interfaces.ACTIVE:
     ...             break
-    ...         time.sleep(0.1)
+    ...         time_sleep(0.1)
     ...     else:
     ...         assert False, 'job never started'
 
@@ -574,6 +890,40 @@
     ...     for i in range(60):
     ...         if dispatcher.activated == False:
     ...             break
-    ...         time.sleep(0.1)
+    ...         time_sleep(0.1)
     ...     else:
-    ...         assert False, 'dispatcher never deactivated'
\ No newline at end of file
+    ...         assert False, 'dispatcher never deactivated'
+
+.. [#cleanup1]
+
+    >>> lock.release()
+    >>> old_dispatcher.thread.join(3)
+    >>> old_dispatcher.dead_pools[0].threads[0].join(3)
+
+.. [#cleanup2]
+
+    >>> lock.release()
+    >>> old_dispatcher.thread.join(3)
+    >>> for queue_pools in old_dispatcher.queues.values():
+    ...     for name, pool in queue_pools.items():
+    ...         pool.setSize(0)
+    ...         for thread in pool.threads:
+    ...             thread.join(3)
+    ...
+    -3
+
+.. [#cleanup3]
+
+    >>> lock.release()
+    >>> dispatcher.thread.join(3)
+    >>> for queue_pools in dispatcher.queues.values():
+    ...     for name, pool in queue_pools.items():
+    ...         pool.setSize(0)
+    ...         for thread in pool.threads:
+    ...             thread.join(3)
+    ...
+    -3
+    >>> alt_dispatcher.reactor.callFromThread(alt_dispatcher.reactor.stop)
+    >>> alt_dispatcher.thread.join(3)
+    >>> alt_dispatcher.dead_pools[0].threads[0].join(3)
+    >>> time.sleep = old_sleep

Modified: zc.async/branches/dev/src/zc/async/configure.py
===================================================================
--- zc.async/branches/dev/src/zc/async/configure.py	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/configure.py	2008-06-17 02:38:06 UTC (rev 87444)
@@ -56,4 +56,4 @@
 def base():
     # see comment in ``minimal``, above
     minimal()
-    zope.component.provideAdapter(zc.twist.connection)
\ No newline at end of file
+    zope.component.provideAdapter(zc.twist.connection)

Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py	2008-06-17 02:38:06 UTC (rev 87444)
@@ -21,6 +21,7 @@
 import twisted.python.failure
 import twisted.internet.defer
 import ZODB.POSException
+import ZEO.Exceptions
 import ZODB.utils
 import BTrees
 import transaction
@@ -55,7 +56,7 @@
 
     def __init__(self):
         self._event = threading.Event()
-    
+
     def setResult(self, value):
         self.result = value
         self._event.set()
@@ -119,6 +120,9 @@
 class AgentThreadPool(object):
 
     _size = 0
+    initial_backoff = 5
+    incremental_backoff = 5
+    maximum_backoff = 60
 
     def __init__(self, dispatcher, name, size):
         self.dispatcher = dispatcher
@@ -134,22 +138,52 @@
         local.dispatcher = self.dispatcher
         conn = self.dispatcher.db.open()
         try:
-            job = self.queue.get()
-            while job is not None:
-                identifier, dbname, info = job
+            job_info = self.queue.get()
+            while job_info is not None:
+                identifier, dbname, info = job_info
                 info['thread'] = thread.get_ident()
                 info['started'] = datetime.datetime.utcnow()
                 zc.async.utils.tracelog.info(
                     'starting in thread %d: %s',
                     info['thread'], info['call'])
+                backoff = self.initial_backoff
+                conflict_retry_count = 0
                 try:
-                    transaction.begin()
-                    if dbname is None:
-                        local_conn = conn
-                    else:
-                        local_conn = conn.get_connection(dbname)
-                    job = local_conn.get(identifier)
-                    local.job = job
+                    while 1:
+                        try:
+                            transaction.begin()
+                            if dbname is None:
+                                local_conn = conn
+                            else:
+                                local_conn = conn.get_connection(dbname)
+                            job = local_conn.get(identifier)
+                            # this setstate should trigger any initial problems
+                            # within the try/except retry structure here.
+                            local_conn.setstate(job)
+                            local.job = job
+                        except ZEO.Exceptions.ClientDisconnected:
+                            zc.async.utils.log.info(
+                                'ZEO client disconnected while trying to '
+                                'get job %d in db %s; retrying in %d seconds',
+                                ZODB.utils.u64(identifier), dbname or '',
+                                backoff)
+                            time.sleep(backoff)
+                            backoff = min(self.maximum_backoff,
+                                          backoff + self.incremental_backoff)
+                        except ZODB.POSException.TransactionError:
+                            # continue, i.e., try again
+                            conflict_retry_count += 1
+                            if (conflict_retry_count == 1 or
+                                not conflict_retry_count % 5):
+                                zc.async.utils.log.warning(
+                                    '%d transaction error(s) while trying to '
+                                    'get job %d in db %s',
+                                    conflict_retry_count,
+                                    ZODB.utils.u64(identifier), dbname or '',
+                                    exc_info=True)
+                            # now ``while 1`` loop will continue, to retry
+                        else:
+                            break
                     try:
                         job() # this does the committing and retrying, largely
                     except zc.async.interfaces.BadStatusError:
@@ -210,14 +244,14 @@
                 zc.async.utils.tracelog.info(
                     'completed in thread %d: %s',
                     info['thread'], info['call'])
-                job = self.queue.get()
+                job_info = self.queue.get()
         finally:
             conn.close()
             if self.dispatcher.activated:
                 # this may cause some bouncing, but we don't ever want to end
                 # up with fewer than needed.
                 self.dispatcher.reactor.callFromThread(self.setSize)
-    
+
     def setSize(self, size=None):
         # this should only be called from the thread in which the reactor runs
         # (otherwise it needs locks)

Modified: zc.async/branches/dev/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.txt	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/dispatcher.txt	2008-06-17 02:38:06 UTC (rev 87444)
@@ -8,29 +8,29 @@
 must have several methods:
 
 class IReactor(zope.interface.Interface):
-    
+
     def callFromThread(callable, *args, **kw):
         """have callable run in reactor's thread, by reactor, ASAP.
-        
+
         Intended to be called from a thread other than the reactor's main
         loop.
         """
-    
+
     def callInThread(callable, *args, **kw):
         """have callable run in a separate thread, ASAP.
-        
+
         Must be called in same thread as reactor's main loop.
         """
-    
+
     def callLater(seconds, callable, *args, **kw):
         """have callable run in reactor at least <seconds> from now
-        
+
         Must be called in same thread as reactor's main loop.
         """
 
     def addSystemEventTrigger(phase, event, callable, *args, **kw):
         """Install a callable to be run in phase of event.
-        
+
         must support phase 'before', and event 'shutdown'.
         """
 
@@ -140,7 +140,7 @@
 - The queue fired events to announce the dispatcher's registration and
   activation.  We could have registered subscribers for either or both
   of these events to create agents.
-  
+
   Note that the dispatcher in queue.dispatchers is a persistent
   representative of the actual dispatcher: they are different objects.
 
@@ -161,17 +161,17 @@
     True
 
 - The dispatcher made its first ping. A ping means that the dispatcher changes
-  a datetime to record that it is alive. 
+  a datetime to record that it is alive.
 
-    >>> queue.dispatchers[dispatcher.UUID].last_ping is not None
+    >>> queue.dispatchers[dispatcher.UUID].last_ping.value is not None
     True
 
-  The dispatcher needs to update its last_ping after every ``ping_interval``
-  seconds.  If it has not updated the last_ping after ``ping_death_interval``
-  then the dispatcher is considered to be dead, and active jobs in the
-  dispatcher's agents are ended (and given a chance to respond to that status
-  change, so they can put themselves back on the queue to be restarted if
-  desired).
+  The dispatcher needs to update its ``last_ping.value`` after every
+  ``ping_interval`` seconds. If it has not updated the ``last_ping.value``
+  after ``ping_death_interval`` then the dispatcher is considered to be dead,
+  and active jobs in the dispatcher's agents are ended (and given a chance to
+  respond to that status change, so they can put themselves back on the queue
+  to be restarted if desired).
 
     >>> queue.dispatchers[dispatcher.UUID].ping_interval
     datetime.timedelta(0, 30)
@@ -180,7 +180,7 @@
 
 - We have some log entries.  (We're using some magic log handlers inserted by
   setup code in tests.py here.)
-  
+
     >>> print event_logs # doctest: +ELLIPSIS
     zc.async.events INFO
       attempting to activate dispatcher ...
@@ -282,8 +282,8 @@
         {'main':
           {'active jobs': [], 'error': None,
            'new jobs': [(..., 'unnamed')], 'len': 0, 'size': 3}}}
-    
 
+
 [#getPollInfo]_ Notice our ``new jobs`` from the poll and the log has a value
 in it now. We can get some information about that job from the dispatcher.
 
@@ -350,8 +350,18 @@
 
     >>> badjob.parent # doctest: +ELLIPSIS
     <zc.async.agent.Agent object at 0x...>
-    >>> len(badjob.parent)
-    0
+    >>> import time
+    >>> for i in range(60):
+    ...     if len(badjob.parent) == 0:
+    ...         print 'yay'
+    ...         break
+    ...     else:
+    ...         time.sleep(0.1)
+    ...         _ = transaction.begin()
+    ... else:
+    ...     print 'oops'
+    ...
+    yay
 
 ``zc.async.local`` also allows some fun tricks.  Your callable can access the
 queue, perhaps to put another job in.
@@ -371,7 +381,7 @@
 of the last database sync for the thread's connection (at transaction
 boundaries).  This function is ``zc.async.local.getJob()``, and is seen below.
 
-It can also get and set job annotations *live, in another connection*. 
+It can also get and set job annotations *live, in another connection*.
 This allows you to send messages about job progress, or get live
 information about whether you should change or stop your work, for
 instance.
@@ -452,8 +462,8 @@
     >>> import ZODB.FileStorage
     >>> storage = ZODB.FileStorage.FileStorage(
     ...     'main.fs', create=True)
-    >>> from ZODB.DB import DB 
-    >>> db = DB(storage) 
+    >>> from ZODB.DB import DB
+    >>> db = DB(storage)
     >>> conn = db.open()
     >>> root = conn.root()
     >>> import zc.async.configure
@@ -461,7 +471,7 @@
 
 .. [#getPollInfo] The dispatcher has a ``getPollInfo`` method that lets you
     find this poll information also.
-    
+
     >>> dispatcher.getPollInfo(at=poll.key) is poll
     True
     >>> dispatcher.getPollInfo(at=poll.utc_timestamp) is poll
@@ -480,7 +490,7 @@
 .. [#show_error] OK, so you want to see a verbose traceback?  OK, you asked
     for it. We're eliding more than 90% of this, and this is a small one,
     believe it or not. Rotate your logs!
-    
+
     Notice that all of the values in the logs are reprs.
 
     >>> bad_job = queue.put(
@@ -489,8 +499,8 @@
 
     >>> wait_for_result(bad_job)
     <zc.twist.Failure exceptions.TypeError>
-    
-    >>> for r in reversed(trace_logs.records):
+
+    >>> for r in reversed(event_logs.records):
     ...     if r.levelname == 'ERROR':
     ...         break
     ... else:
@@ -500,12 +510,9 @@
     <zc.async.job.Job (oid ..., db 'unnamed')
      ``<built-in function mul>(14, None)``> failed with traceback:
     *--- Failure #... (pickled) ---
-    .../zc/async/job.py:...: _call_with_retry(...)
+    .../zc/async/job.py:...: __call__(...)
      [ Locals ]...
      ( Globals )...
-    .../zc/async/job.py:...: <lambda>(...)
-     [ Locals ]...
-     ( Globals )...
     exceptions.TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'
     *--- End of Failure #... ---
     <BLANKLINE>

Modified: zc.async/branches/dev/src/zc/async/interfaces.py
===================================================================
--- zc.async/branches/dev/src/zc/async/interfaces.py	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/interfaces.py	2008-06-17 02:38:06 UTC (rev 87444)
@@ -28,12 +28,12 @@
 except ImportError:
     class IDatabaseOpenedEvent(zope.interface.Interface):
         """The main database has been opened."""
-    
+
         database = zope.interface.Attribute("The main database.")
-    
+
     class DatabaseOpened(object):
         zope.interface.implements(IDatabaseOpenedEvent)
-    
+
         def __init__(self, database):
             self.database = database
 
@@ -51,31 +51,31 @@
 
 class IReactor(zope.interface.Interface):
     """This describes what the dispatcher expects of the reactor.
-    
+
     The reactor does not need to actually provide this interface."""
-    
+
     def callFromThread(callable, *args, **kw):
         """have callable run in reactor's thread, by reactor, ASAP.
-        
+
         Intended to be called from a thread other than the reactor's main
         loop.
         """
-    
+
     def callInThread(callable, *args, **kw):
         """have callable run in a separate thread, ASAP.
-        
+
         Must be called in same thread as reactor's main loop.
         """
-    
+
     def callLater(seconds, callable, *args, **kw):
         """have callable run in reactor at least <seconds> from now
-        
+
         Must be called in same thread as reactor's main loop.
         """
 
     def addSystemEventTrigger(phase, event, callable, *args, **kw):
         """Install a callable to be run in phase of event.
-        
+
         must support phase 'before', and event 'shutdown'.
         """
 
@@ -83,10 +83,35 @@
         """run callable now if running, or when started.
         """
 
+class IRetryPolicy(zope.interface.Interface):
+    def jobError(failure, data_cache):
+        """whether and how to retry after an error while performing job.
 
+        return boolean as to whether to retry, or a datetime or timedelta to
+        reschedule the job in the queue.  An empty timedelta means to rescedule
+        for immediately, before any pending calls in the queue."""
+
+    def commitError(failure, data_cache):
+        """whether to retry after trying to commit a job's successful result.
+
+        return boolean as to whether to retry, or a datetime or timedelta to
+        reschedule the job in the queue.  An empty timedelta means to rescedule
+        for immediately, before any pending calls in the queue."""
+
+    def interrupted():
+        """whether to retry after a dispatcher dies when job was in progress.
+
+        return boolean as to whether to retry, or a datetime or timedelta to
+        reschedule the job in the queue.  An empty timedelta means to rescedule
+        for immediately, before any pending calls in the queue."""
+
+    def updateData(data_cache):
+        """right before committing a job, retry is given a chance to stash
+        information it has saved in the data_cache."""
+
 class IObjectEvent(zope.interface.Interface):
     """Event happened to object"""
-    
+
     object = zope.interface.Attribute('the object')
 
 class AbstractObjectEvent(object):
@@ -136,9 +161,13 @@
 
 class AbortedError(Exception):
     """An explicit abort, as generated by the default behavior of
-    IJob.fail"""
+    IJob.handleInterrupt"""
 
 
+class TimeoutError(Exception):
+    """A time out caused by a ``begin_by`` value."""
+
+
 class BadStatusError(Exception):
     """The job is not in the status it should be for the call being made.
     This is almost certainly a programmer error."""
@@ -168,7 +197,7 @@
         """One of constants defined in zc.async.interfaces:
         NEW, PENDING, ASSIGNED, ACTIVE, CALLBACKS, COMPLETED.
 
-        NEW means not added to a queue and not yet called.        
+        NEW means not added to a queue and not yet called.
         PENDING means addded to a queue but not an agent, and not yet called.
         ASSIGNED means added to an agent and not yet called.
         ACTIVE means in the process of being called.
@@ -207,10 +236,8 @@
         self.args for the call, and any kwargs effectively update self.kwargs
         for the call."""
 
-    def fail(e=AbortedError):
-        """Fail this job, with option error e.  May only be called when
-        job is in PENDING or ACTIVE states, or else raises BadStatusError.
-        If e is not provided,"""
+    def handleInterrupt():
+        """use IRetryPolicy to decide whether to abort."""
 
     def resumeCallbacks():
         """Make all callbacks remaining for this job.  Any callbacks
@@ -229,7 +256,7 @@
 #         """a set of selected worker UUIDs.  If it is empty, it is
 #         interpreted as the set of all available workerUUIDs.  Only
 #         workers with UUIDs in the set may perform it.
-# 
+#
 #         If a worker would have selected this job for a run, but the
 #         difference of selected_workerUUIDs and excluded_workerUUIDs
 #         stopped it, it is responsible for verifying that the effective
@@ -250,26 +277,26 @@
 
 class IAgent(zope.interface.common.sequence.IFiniteSequence):
     """Responsible for picking jobs and keeping track of them.
-    
+
     An agent is a persistent object in a queue that is associated with a
     dispatcher and is responsible for picking jobs and keeping track of
     them. Zero or more agents within a queue can be associated with a
-    dispatcher. 
-    
+    dispatcher.
+
     Each agent for a given dispatcher is identified uniquely with a
     name.  A fully (universally) unique identifier for the agent can be
     obtained by combining the key of the agent's queue in the main queue
     mapping at the ZODB root; the UUID of the agent's dispatcher; and
     the agent's name.
     """
-    
+
     size = zope.interface.Attribute(
         """The maximum number of jobs this agent should have active at a time.
         """)
 
     name = zope.interface.Attribute(
         """The name for this agent.  Unique within its dispatcher's jobs for
-        its queue.  Can be used to obtain agent with 
+        its queue.  Can be used to obtain agent with
         queue.dispatchers[*dispatcher UUID*][*name*].""")
 
     completed = zope.interface.Attribute(
@@ -305,9 +332,9 @@
         Rememeber that IJobs are not guaranteed to be run in order
         added to a queue.  If you need sequencing, use
         IJob.addCallbacks.
-        
+
         item must be an IJob, or be adaptable to that interface.
-        begin_after must be None (to leave the job's current value) or a 
+        begin_after must be None (to leave the job's current value) or a
         datetime.datetime.  begin_by must be None (to leave it alone) or a
         datetime.timedelta of the duration after the begin_after.
 
@@ -326,11 +353,14 @@
         queue on the `assignerUUID`.
         """
 
+    def putBack(item):
+        """Return a previously claimed job to the top of the queue."""
+
     def pull(index=0):
         """Remove and return a job, by default from the front of the queue.
 
         Raise IndexError if index does not exist.
-        
+
         This is the blessed way to remove an unclaimed job from the queue so
         that dispatchers will not try to perform it.
         """
@@ -356,7 +386,7 @@
     def ping(UUID):
         """responsible for setting ping time if necessary for this
         dispatcher agent, and for decomissioning dead dispatchers for
-        the next highest dispatcher (sorted by UUID) if its (last_ping +
+        the next highest dispatcher (sorted by UUID) if its (last_ping.value +
         ping_interval + ping_death_interval) < now.  If this is the
         highest dispatcher UUID, cycles around to lowest."""
 

Modified: zc.async/branches/dev/src/zc/async/job.py
===================================================================
--- zc.async/branches/dev/src/zc/async/job.py	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/job.py	2008-06-17 02:38:06 UTC (rev 87444)
@@ -77,35 +77,9 @@
                     elif status == zc.async.interfaces.CALLBACKS:
                         a.resumeCallbacks()
 
-class IRetries(zope.interface.Interface): # XXX move
-    def jobError(failure, data_cache):
-        """whether and how to retry after an error while performing job.
-        
-        return boolean as to whether to retry, or a datetime or timedelta to
-        reschedule the job in the queue.  An empty timedelta means to rescedule
-        for immediately, before any pending calls in the queue."""
-
-    def transactionError(failure, data_cache):
-        """whether to retry after trying to commit a job's successful result.
-        
-        return boolean as to whether to retry, or a datetime or timedelta to
-        reschedule the job in the queue.  An empty timedelta means to rescedule
-        for immediately, before any pending calls in the queue."""
-
-    def interrupted():
-        """whether to retry after a dispatcher dies when job was in progress.
-        
-        return boolean as to whether to retry, or a datetime or timedelta to
-        reschedule the job in the queue.  An empty timedelta means to rescedule
-        for immediately, before any pending calls in the queue."""
-
-    def updateData(data_cache):
-        """right before committing a job, retry is given a chance to stash
-        information it has saved in the data_cache."""
-
-class Retries(persistent.Persistent): # default for '' IRetries
+class RetryCommonFourTimes(persistent.Persistent): # default
     zope.component.adapts(zc.async.interfaces.IJob)
-    zope.interface.implements(zc.async.interfaces.IRetries)
+    zope.interface.implements(zc.async.interfaces.IRetryPolicy)
 
     # exceptions, data_cache key, max retry, initial backoff seconds,
     # incremental backoff seconds, max backoff seconds
@@ -116,13 +90,14 @@
          5, 0, 0, 0),
     )
     transaction_exceptions = internal_exceptions
-    max_interruptions = 10
+    max_interruptions = 9
+    log_every = 5
 
     def __init__(self, job):
         self.parent = self.__parent__ = job
         self.data = BTrees.family32.OO.BTree()
 
-    def updateData(data_cache):
+    def updateData(self, data_cache):
         if 'first_active' in self.data and 'first_active' in data_cache:
             data_cache.pop('first_active')
         self.data.update(data_cache)
@@ -130,7 +105,7 @@
     def jobError(self, failure, data_cache):
         return self._process(failure, data_cache, self.internal_exceptions)
 
-    def transactionError(self, failure, data_cache):
+    def commitError(self, failure, data_cache):
         return self._process(failure, data_cache, self.transaction_exceptions)
 
     def _process(self, failure, data_cache, exceptions):
@@ -139,7 +114,15 @@
             if failure.check(*exc) is not None:
                 count = data_cache.get(key, 0) + 1
                 if max_count is not None and count >= max_count:
+                    zc.async.utils.tracelog.info(
+                        'Retry policy for job %r is not retrying after %d '
+                        'counts of %s occurrences', self.parent, count, key)
                     return False
+                elif count==1 or not count % self.log_every:
+                    zc.async.utils.tracelog.info(
+                        'Retry policy for job %r requests another attempt '
+                        'after %d counts of %s occurrences', self.parent,
+                        count, key)
                 backoff = min(max_backoff,
                               (init_backoff + (count-1) * incr_backoff))
                 if backoff:
@@ -154,23 +137,34 @@
     def interrupted(self):
         if 'first_active' not in self.data:
             self.data['first_active'] = self.parent.active_start
-        ct = self.data['interruptions'] = self.data.get('interruptions', 0) + 1
-        return self.max_interruptions is None or ct <= self.max_interruptions
+        count = self.data['interruptions'] = self.data.get('interruptions', 0) + 1
+        if self.max_interruptions is None or count <= self.max_interruptions:
+            if count==1 or not count % self.log_every:
+                zc.async.utils.tracelog.info(
+                    'Retry policy for job %r requests another attempt '
+                    'after %d interrupts', self.parent, count)
+            return True
+        else:
+            zc.async.utils.tracelog.info(
+                'Retry policy for job %r is not retrying after %d '
+                'interrupts', self.parent, count)
+            return False
 
-class RetrySystemErrorsForever(Retries): # default for 'callback' IRetries
+
+class RetryCommonForever(RetryCommonFourTimes):
     # retry on ZEO failures and Transaction errors during the job forever
-    # retry on transactionErrors and interrupteds forever.
+    # retry on commitErrors and interrupteds forever.
     internal_exceptions = (
         ((ZEO.Exceptions.ClientDisconnected,), 'zeo_disconnected',
          None, 5, 5, 60),
         ((ZODB.POSException.TransactionError,), 'transaction_error',
          None, 0, 0, 0),
     )
-    
+
     max_interruptions = None
 
-    def transactionError(self, failure, data_cache):
-        res = super(RetryForever, self).transactionError(failure, data_cache)
+    def commitError(self, failure, data_cache):
+        res = super(RetryCommonForever, self).commitError(failure, data_cache)
         if not res:
             # that just means we didn't record it.  We actually are going to
             # retry.
@@ -181,6 +175,33 @@
                 data_cache['first_active'] = self.parent.active_start
         return True # always retry
 
+class NeverRetry(persistent.Persistent):
+    zope.component.adapts(zc.async.interfaces.IJob)
+    zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+
+    def __init__(self, job):
+        self.parent = self.__parent__ = job
+
+    def updateData(self, data_cache):
+        pass
+
+    def jobError(self, failure, data_cache):
+        return False
+
+    def commitError(self, failure, data_cache):
+        return False
+
+    def interrupted(self):
+        return False
+
+def callback_retry_policy_factory(job):
+    res = zope.component.queryAdapter(
+        job, zc.async.interfaces.IRetryPolicy, 'callback')
+    if res is None:
+        res = RetryCommonForever(job)
+    return res
+
+
 class Job(zc.async.utils.Base):
 
     zope.interface.implements(zc.async.interfaces.IJob)
@@ -189,11 +210,31 @@
     _status = zc.async.interfaces.NEW
     _begin_after = _begin_by = _active_start = _active_end = None
     key = None
-    # default_retry_policy and retry_policy should either be name to adapt job
-    # to IRetries, or factory, or None.
-    default_retry_policy = ''
-    retry_policy = None
-    retries = None
+    retry_policy_factory = None
+    _retry_policy = None
+    def getRetryPolicy(self):
+        if self._retry_policy is not None:
+            return self._retry_policy
+        if self.retry_policy_factory is None:
+            # first try to look up adapter with name of ''; then if that fails
+            # use RetryCommonFourTimes
+            res = zope.component.queryAdapter(
+                self, zc.async.interfaces.IRetryPolicy, '')
+            if res is None:
+                res = RetryCommonFourTimes(self)
+        elif isinstance(self.retry_policy_factory, basestring):
+            res = zope.component.getAdapter(
+                self, zc.async.interfaces.IRetryPolicy,
+                self.retry_policy_factory)
+            # this may cause an error. We can't proceed because we don't know
+            # what to do, and it may be *critical* to know. Therefore, in
+            # _getRetry, we rely on never_fail to keep on sending critical
+            # errors in the log, and never stopping.
+        else:
+            res = self.retry_policy_factory(self)
+        self._retry_policy = res
+        return res
+
     default_error_log_level = logging.ERROR
     error_log_level = None
 
@@ -203,12 +244,6 @@
             return self.default_error_log_level
         return self.error_log_level
 
-    @property
-    def effective_retry_policy(self):
-        if self.retry_policy is None:
-            return self.default_retry_policy
-        return self.retry_policy
-
     assignerUUID = None
     _quota_names = ()
 
@@ -371,24 +406,28 @@
             self._callable_root.parent = self
 
     def addCallbacks(self, success=None, failure=None,
-                     error_log_level=None, retry_policy=None):
+                     error_log_level=None, retry_policy_factory=None):
         if success is not None or failure is not None:
             if success is not None:
                 success = zc.async.interfaces.IJob(success)
                 success.default_error_log_level = logging.CRITICAL
                 if error_log_level is not None:
                     success.error_log_level = error_log_level
-                success.default_retry_policy = 'callback'
-                if retry_policy is not None:
-                    success.retry_policy = retry_policy
+                if retry_policy_factory is not None:
+                    success.retry_policy_factory = retry_policy_factory
+                elif success.retry_policy_factory is None:
+                    success.retry_policy_factory = (
+                        callback_retry_policy_factory)
             if failure is not None:
                 failure = zc.async.interfaces.IJob(failure)
                 failure.default_error_log_level = logging.CRITICAL
                 if error_log_level is not None:
                     failure.error_log_level = error_log_level
-                failure.default_retry_policy = 'callback'
-                if retry_policy is not None:
-                    failure.retry_policy = retry_policy
+                if retry_policy_factory is not None:
+                    failure.retry_policy_factory = retry_policy_factory
+                elif failure.retry_policy_factory is None:
+                    failure.retry_policy_factory = (
+                        callback_retry_policy_factory)
             res = Job(success_or_failure, success, failure)
             if success is not None:
                 success.parent = res
@@ -400,18 +439,17 @@
             abort_handler = zc.async.interfaces.IJob(
                 completeStartedJobArguments)
             abort_handler.args.append(res)
-            res.addCallback(abort_handler, error_log_level)
+            res.addCallback(
+                abort_handler, error_log_level, retry_policy_factory)
             abort_handler.default_error_log_level = logging.CRITICAL
             if error_log_level is not None:
                 abort_handler.error_log_level = error_log_level
-            abort_handler.default_retry_policy = 'callback'
-            if retry_policy is not None:
-                abort_handler.retry_policy = retry_policy
         else:
             res = self
         return res
 
-    def addCallback(self, callback, error_log_level=None, retry_policy=None):
+    def addCallback(self, callback, error_log_level=None,
+                    retry_policy_factory=None):
         callback = zc.async.interfaces.IJob(callback)
         self.callbacks.put(callback)
         callback.parent = self
@@ -423,35 +461,29 @@
         callback.default_error_log_level = logging.CRITICAL
         if error_log_level is not None:
             callback.error_log_level = error_log_level
-        callback.default_retry_policy = 'callback'
-        if retry_policy is not None:
-            callback.retry_policy = retry_policy
+        if retry_policy_factory is not None:
+            callback.retry_policy_factory = retry_policy_factory
+        elif callback.retry_policy_factory is None:
+            callback.retry_policy_factory = callback_retry_policy_factory
         return callback
 
     def _getRetry(self, call_name, tm, *args):
-        def getRetry():
-            retries = self.retries
-            if retries is None:
-                retry_policy = self.effective_retry_policy
-                if retry_policy is None:
-                    return None # means, do not retry ever
-                elif isinstance(retry_policy, basestring):
-                    retries = zope.component.getAdapter(
-                        self, zc.async.interfaces.IRetries,
-                        name=retry_policy)
-                else:
-                    retries = retry_policy(self)
-                if retries is not None:
-                    self.retries = retries
-            call = getattr(retries, call_name, None)
-            if call is None:
-                zc.async.utils.log.error(
-                    'retries %r for %r does not have required %s method',
-                    retries, self, call_name)
-                return None
-            return call(*args)
-        identifier = 'getting %s retry for %r' % (call_name, self)
-        return zc.async.utils.never_fail(getRetry, identifier, tm)
+        # if we are after the time that we are supposed to begin_by, no retry
+        if (self.begin_by is not None and self.begin_after is not None and
+            self.begin_by + self.begin_after > datetime.datetime.now(pytz.UTC)):
+            return False
+        # we divide up the two ``never_fail`` calls so that retries in getting
+        # the policy don't affect actually calling the method.
+        identifier = 'getting retry policy for %r' % (self,)
+        policy = zc.async.utils.never_fail(self.getRetryPolicy, identifier, tm)
+        call = getattr(policy, call_name, None)
+        if call is None:
+            zc.async.utils.log.error(
+                'retry policy %r for %r does not have required %s method',
+                policy, self, call_name)
+            return None
+        identifier = 'getting result for %s retry for %r' % (call_name, self)
+        return zc.async.utils.never_fail(lambda: call(*args), identifier, tm)
 
     def __call__(self, *args, **kwargs):
         if self.status not in (zc.async.interfaces.NEW,
@@ -459,16 +491,20 @@
             raise zc.async.interfaces.BadStatusError(
                 'can only call a job with NEW or ASSIGNED status')
         tm = transaction.interfaces.ITransactionManager(self)
-        self._status = zc.async.interfaces.ACTIVE
-        self._active_start = datetime.datetime.now(pytz.UTC)
-        tm.commit()
-        effective_args = list(args)
-        effective_args[0:0] = self.args
-        effective_kwargs = dict(self.kwargs)
-        effective_kwargs.update(kwargs)
+        def prepare():
+            self._status = zc.async.interfaces.ACTIVE
+            self._active_start = datetime.datetime.now(pytz.UTC)
+            effective_args = list(args)
+            effective_args[0:0] = self.args
+            effective_kwargs = dict(self.kwargs)
+            effective_kwargs.update(kwargs)
+            return effective_args, effective_kwargs
+        identifier = 'preparing for call of %r' % (self,)
+        effective_args, effective_kwargs = zc.async.utils.never_fail(
+            prepare, identifier, tm)
         # this is the calling code.  It is complex and long because it is
         # trying both to handle exceptions reasonably, and to honor the
-        # IRetries interface for those exceptions.
+        # IRetryPolicy interface for those exceptions.
         data_cache = {}
         res = None
         while 1:
@@ -484,107 +520,172 @@
                 if isinstance(retry, (datetime.timedelta, datetime.datetime)):
                     identifier = (
                         'rescheduling %r as requested by '
-                        'associated IRetries %r' % (
-                            self, self.retries))
+                        'associated IRetryPolicy %r' % (
+                            self, self.getRetryPolicy()))
                     if self is zc.async.utils.never_fail(
                         lambda: self._reschedule(retry, data_cache),
                         identifier, tm):
                         return self
                 elif retry:
                     continue
+                # policy didn't exist or returned False or couldn't reschedule
             try:
-                self._set_result(res)
+                callback = self._set_result(res, tm, data_cache)
             except zc.async.utils.EXPLOSIVE_ERRORS:
                 tm.abort()
                 raise
             except:
+                failure = zc.twist.Failure()
+                tm.abort()
+                retry = self._getRetry('commitError', tm, failure, data_cache)
+                if isinstance(retry, (datetime.timedelta, datetime.datetime)):
+                    identifier = (
+                        'rescheduling %r as requested by '
+                        'associated IRetryPolicy %r' % (
+                            self, self.getRetryPolicy()))
+                    if self is zc.async.utils.never_fail(
+                        lambda: self._reschedule(retry, data_cache),
+                        identifier, tm):
+                        return self
+                elif retry:
+                    continue
+                # policy didn't exist or returned False or couldn't reschedule
                 if isinstance(res, twisted.python.failure.Failure):
                     zc.async.utils.log.log(
                         self.effective_error_log_level,
                         'Commit failed for %r (see subsequent traceback).  '
-                        'Prior to this, job originally failed with '
-                        'traceback:\n%s',
+                        'Prior to this, job failed with traceback:\n%s',
                         self,
                         res.getTraceback(
                             elideFrameworkCode=True, detail='verbose'))
                 else:
-                    zc.async.utils.tracelog.info(
+                    zc.async.utils.log.info(
                         'Commit failed for %r (see subsequent traceback).  '
                         'Prior to this, job succeeded with result: %r',
                         self, res)
-                res = zc.twist.Failure()
-                tm.abort()
-                retry = self._getRetry('jobError', tm, res, data_cache)
-                if isinstance(retry, (datetime.timedelta, datetime.datetime)):
-                    identifier = (
-                        'rescheduling %r as requested by '
-                        'associated IRetries %r' % (
-                            self, self.retries))
-                    if self is zc.async.utils.never_fail(
-                        lambda: self._reschedule(retry, data_cache),
-                        identifier, tm):
-                        return self
-                elif retry:
-                    continue
-                # retries didn't exist or returned False
+                res = failure
                 def complete():
                     self._result = res
                     self._status = zc.async.interfaces.CALLBACKS
                     self._active_end = datetime.datetime.now(pytz.UTC)
-                    if self.retries is not None:
-                        self.retries.updateData(data_cache)
-                identifier = ('storing failure at commit of %r' % (self,))
+                    policy = self.getRetryPolicy()
+                    if data_cache and self._retry_policy is not None:
+                        self._retry_policy.updateData(data_cache)
+                identifier = 'storing failure at commit of %r' % (self,)
                 zc.async.utils.never_fail(complete, identifier, tm)
-            self._complete(res)
+                callback = True
+            if callback:
+                self._log_completion(res)
+                identifier = 'performing callbacks of %r' % (self,)
+                zc.async.utils.never_fail(self.resumeCallbacks, identifier, tm)
             return res
 
     def handleInterrupt(self):
-        # this is called either within a job (that has a never fail policy)
-        # or withing _resumeCallbacks (that uses never_fail)
-        if self.status is not zc.async.interfaces.ACTIVE:
+        # should be called within a job that has a RetryCommonForever policy
+        tm = transaction.interfaces.ITransactionManager(self)
+        if self.status == zc.async.interfaces.ACTIVE:
+            retry = self._getRetry('interrupted', tm)
+            if isinstance(retry, (datetime.datetime, datetime.timedelta)):
+                self._reschedule(retry, queue=self.queue)
+            elif retry:
+                self._reschedule(datetime.timedelta(), queue=self.queue)
+            else:
+                res = zc.twist.Failure(zc.async.interfaces.AbortedError())
+                if self._set_result(res, tm):
+                    self.resumeCallbacks()
+                self._log_completion(res)
+        elif self.status != zc.async.interfaces.CALLBACKS:
+            # we have to allow CALLBACKS or else some retries will fall over,
+            # because handleInterrupt may fail after a commit of the aborted
+            # error
             raise zc.async.interfaces.BadStatusError(
                 'can only call ``handleInterrupt`` on a job with ACTIVE '
-                'status')
-        tm = transaction.interfaces.ITransactionManager(self)
-        retry = self._getRetry('interrupted', tm)
-        if retry:
-            if not isinstance(retry, (datetime.timedelta, datetime.datetime)):
-                retry = datetime.timedelta()
-            self._reschedule(retry)
+                'status') # um...or CALLBACKS, but that's a secret :-D
         else:
-            self.fail()
+            self.resumeCallbacks()
 
-    def _reschedule(self, retry, data_cache=None):
-        if not zc.async.interfaces.IAgent.providedBy(self.parent):
-            zc.async.utils.log.error(
-                'error for IRetries %r on %r: '
-                'can only reschedule a job directly in an agent',
-                self.retries, self)
-            return None
+    def fail(self, e=None):
+        # something may have fallen over the last time this was called, so we
+        # are careful to only store the error if we're not in the CALLBACKS
+        # status.
+        callback = True
+        status = self.status
+        if status in (zc.async.interfaces.COMPLETED,
+                      zc.async.interfaces.ACTIVE):
+            raise zc.async.interfaces.BadStatusError(
+                'can only call fail on a job with NEW, PENDING, or ASSIGNED '
+                'status') # ...or CALLBACKS, but that's because of
+                # retries, and is semantically incorrect
+        if status != zc.async.interfaces.CALLBACKS:
+            if e is None:
+                e = zc.async.interfaces.TimeoutError()
+            res = zc.twist.Failure(e)
+            callback = self._set_result(
+                res, transaction.interfaces.ITransactionManager(self))
+            self._log_completion(res)
+        if callback:
+            self.resumeCallbacks()
+
+    def _reschedule(self, when, data_cache=None, queue=None):
+        if not isinstance(when, (datetime.datetime, datetime.timedelta)):
+            raise TypeError('``when`` must be datetime or timedelta')
+        in_agent = zc.async.interfaces.IAgent.providedBy(self.parent)
+        if queue is None:
+            # this is a reschedule from jobError or commitError
+            if not in_agent:
+                zc.async.utils.log.critical(
+                    'error for IRetryPolicy %r on %r: '
+                    'can only reschedule a job directly in an agent',
+                    self.getRetryPolicy(), self)
+                return None
+            queue = self.queue
+        if data_cache is not None and self._retry_policy is not None:
+            self._retry_policy.updateData(data_cache)
         self._status = zc.async.interfaces.NEW
-        del self._active_start
-        if data_cache is not None and self.retries is not None:
-            self.retries.updateData(data_cache)
-        self.parent.reschedule(self, retry)
+        self._active_start = None
+        if in_agent:
+            self.parent.remove(self)
+        else:
+            self.parent = None
+        now = datetime.datetime.now(pytz.UTC)
+        if isinstance(when, datetime.datetime):
+            if when.tzinfo is None:
+                when = when.replace(tzinfo=pytz.UTC)
+            if when <= now:
+                queue.putBack(self)
+            else:
+                queue.put(self, begin_after=when)
+        elif isinstance(when, datetime.timedelta):
+            if when <= datetime.timedelta():
+                queue.putBack(self)
+            else:
+                queue.put(self, begin_after=now+when)
         return self
 
-    def _set_result(self, res):
+    def _set_result(self, res, tm, data_cache=None):
+        # returns whether to call ``resumeCallbacks``
+        callback = True
         if zc.async.interfaces.IJob.providedBy(res):
             res.addCallback(self._callback)
+            callback = False
         elif isinstance(res, twisted.internet.defer.Deferred):
             res.addBoth(zc.twist.Partial(self._callback))
-            # XXX need to tell Partial to retry forever
+            # TODO need to tell Partial to retry forever: currently no
+            # spelling for this in zc.twist.  This makes using deferreds
+            # less reliable: _callback *must* be called.
+            callback = False
         else:
             if isinstance(res, twisted.python.failure.Failure):
                 res = zc.twist.sanitize(res)
             self._result = res
             self._status = zc.async.interfaces.CALLBACKS
             self._active_end = datetime.datetime.now(pytz.UTC)
-        if self.retries is not None:
-            self.retries.updateData(data_cache)
+        if self._retry_policy is not None and data_cache:
+            self._retry_policy.updateData(data_cache)
         tm.commit()
+        return callback
 
-    def _complete(self, res):
+    def _log_completion(self, res):
         if isinstance(res, twisted.python.failure.Failure):
             zc.async.utils.log.log(
                 self.effective_error_log_level,
@@ -596,48 +697,62 @@
             zc.async.utils.tracelog.info(
                 '%r succeeded with result: %r',
                 self, res)
-        self.resumeCallbacks()
 
     def _callback(self, res):
         # done within a job or partial, so we can rely on their retry bits to
         # some degree.  However, we commit transactions ourselves, so we have
         # to be a bit careful that the result hasn't been set already.
+        callback = True
         if self._status == zc.async.interfaces.ACTIVE:
-            self._set_result(res)
-        self._complete(res)
+            callback = self._set_result(
+                res, transaction.interfaces.ITransactionManager(self))
+            self._log_completion(res)
+        if callback:
+            self.resumeCallbacks()
 
-    def fail(self, e=None):
-        if e is None:
-            e = zc.async.interfaces.AbortedError()
-        if self._status not in (zc.async.interfaces.NEW,
-                                zc.async.interfaces.ACTIVE):
-            raise zc.async.interfaces.BadStatusError(
-                'can only call fail on a job with NEW, PENDING, ASSIGNED, or '
-                'ACTIVE status')
-        self._complete(zc.twist.Failure(e))
-
     def resumeCallbacks(self):
+        # should be called within a job that has a RetryCommonForever policy
         if self._status != zc.async.interfaces.CALLBACKS:
             raise zc.async.interfaces.BadStatusError(
                 'can only resumeCallbacks on a job with CALLBACKS status')
-        identifier = 'performing callbacks for %r' % (self,)
-        tm = transaction.interfaces.ITransactionManager(self)
-        zc.async.utils.never_fail(self._resumeCallbacks, identifier, tm)
-
-    def _resumeCallbacks(self):
         callbacks = list(self.callbacks)
         tm = transaction.interfaces.ITransactionManager(self)
         length = 0
         while 1:
             for j in callbacks:
+                # TODO yuck: this mucks in callbacks' protected bits
                 if j._status == zc.async.interfaces.NEW:
-                    zc.async.utils.tracelog.debug(
-                        'starting callback %r to %r', j, self)
-                    j(self.result)
+                    if (j.begin_by is not None and
+                        (j.begin_after + j.begin_by) <
+                        datetime.datetime.now(pytz.UTC)):
+                        zc.async.utils.log.error(
+                            'failing expired callback %r to %r', j, self)
+                        j.fail()
+                    else:
+                        zc.async.utils.tracelog.debug(
+                            'starting callback %r to %r', j, self)
+                        j(self.result)
                 elif j._status == zc.async.interfaces.ACTIVE:
-                    zc.async.utils.tracelog.debug(
-                        'failing aborted callback %r to %r', j, self)
-                    j.handleInterrupt()
+                    retry = j._getRetry('interrupted', tm)
+                    istime = isinstance(
+                        retry, (datetime.timedelta, datetime.datetime))
+                    if istime:
+                        zc.async.utils.log.error(
+                            'error for IRetryPolicy %r on %r: '
+                            'cannot reschedule a callback, only retry',
+                            j.getRetryPolicy(), j)
+                    if retry or istime:
+                        zc.async.utils.tracelog.debug(
+                            'retrying interrupted callback '
+                            '%r to %r', j, self)
+                        j._status = zc.async.interfaces.NEW
+                        j._active_start = None
+                        j(self.result)
+                    else:
+                        zc.async.utils.tracelog.debug(
+                            'aborting interrupted callback '
+                            '%r to %r', j, self)
+                        j.fail(zc.async.interfaces.AbortedError())
                 elif j._status == zc.async.interfaces.CALLBACKS:
                     j.resumeCallbacks()
                 # TODO: this shouldn't raise anything we want to catch, right?
@@ -654,4 +769,4 @@
                 if zc.async.interfaces.IAgent.providedBy(self.parent):
                     self.parent.jobCompleted(self)
                 tm.commit()
-
+                return

Modified: zc.async/branches/dev/src/zc/async/job.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/job.txt	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/job.txt	2008-06-17 02:38:06 UTC (rev 87444)
@@ -2,34 +2,34 @@
 Jobs
 ====
 
-What if you want to persist a reference to the method of a persistent
-object--you can't persist that normally in the ZODB, but that can be
-very useful, especially to store asynchronous calls.  What if you want
-to act on the result of an asynchronous call that may be called later?
-The zc.async package offers an approach that combines ideas of a partial
-and that of Twisted deferred code: ``zc.async.job.Job``.  
+What if you want to persist a reference to the method of a persistent object?
+You can't persist that normally in the ZODB, but that can be very useful,
+especially to store asynchronous calls. What if you want to act on the result
+of an asynchronous call that may be called later?
 
-To use it, simply wrap the callable--a method of a persistent object or
-a callable persistent object or a global function--in the job.  You can
-include ordered and keyword arguments to the job, which may be
-persistent objects or simply pickleable objects.
+The zc.async package offers an approach that combines ideas of a partial and
+that of a Twisted deferred: ``zc.async.job.Job``. It has code that is specific
+to zc.async, so it is not truly a general-purpose persistent partial, but this
+file shows the Job largely stand-alone.
 
-Unlike a partial but like a Twisted deferred, the result of the wrapped
-call goes on the job's ``result`` attribute, and the immediate return of
-the call might not be the job's end result.  It could also be a failure,
-indicating an exception; or another partial, indicating that we are
-waiting to be called back by the second partial; or a twisted deferred,
-indicating that we are waiting to be called back by a twisted Deferred
-(see the ``zc.twist``).  After you have the partial, you can then use a
-number of methods and attributes on the partial for further set up. 
-Let's show the most basic use first, though.
+To use a job, simply wrap the callable--a method of a persistent object or a
+callable persistent object or a global function--in the job. You can include
+ordered and keyword arguments to the job, which may be persistent objects or
+simply pickleable objects.
 
-Note that, even though this looks like an interactive prompt, all
-functions and classes defined in this document act as if they were
-defined within a module.  Classes and functions defined in an interactive
-prompt are normally not picklable, and Jobs must work with
-picklable objects [#set_up]_.
+Unlike a partial but like a Twisted deferred, the result of the wrapped call
+goes on the job's ``result`` attribute. It could also be a failure, indicating
+an exception; or temporarily, None, indicating that we are waiting to be called
+back by a second Job or a twisted Deferred (see the ``zc.twist`` package).
 
+After you have the job, you can then use a number of methods and attributes
+on the partial for further set up. Let's show the most basic use first, though.
+
+(Note that, even though this looks like an interactive prompt, all functions and
+classes defined in this document act as if they were defined within a module.
+Classes and functions defined in an interactive prompt are normally not
+picklable, and Jobs must work with picklable objects. [#set_up]_.)
+
     >>> import zc.async.job
     >>> def call():
     ...     print 'hello world'
@@ -52,8 +52,8 @@
     >>> j.status == zc.async.interfaces.NEW
     True
 
-We can call the job from the NEW (or ASSIGNED, see later) status, and
-then see that the function was called, and see the result on the partial.
+We can call the job from the NEW (or PENDING or ASSIGNED, see later) status,
+and then see that the function was called, and see the result on the partial.
 
     >>> res = j()
     hello world
@@ -69,8 +69,8 @@
     'my result'
 
 In addition to using a global function, we can also use a method of a
-persistent object.  Imagine we have a ZODB root that we can put objects
-in to.
+persistent object. (For this example, imagine we have a ZODB root into which we
+can put objects.)
 
     >>> import persistent
     >>> class Demo(persistent.Persistent):
@@ -117,13 +117,13 @@
     ...
     exceptions.RuntimeError: Bad Things Happened Here
 
-Note that all calls can return a failure explicitly, rather than raising
-an exception that the job converts to an exception.  However, there
-is an important difference in behavior.  If a wrapped call raises an
-exception, the job aborts the transaction; but if the wrapped call
-returns a failure, no abort occurs.  Wrapped calls that explicitly return
-failures are thus responsible for any necessary transaction aborts.  See
-the footnote for an example [#explicit_failure_example]_.
+Note that all calls can return a failure explicitly, rather than raising an
+exception that the job converts to an exception. However, there is an important
+difference in behavior. If a wrapped call raises an exception, the job aborts
+the transaction; but if the wrapped call returns a failure, no automatic abort
+occurs. Wrapped calls that explicitly return failures are thus responsible for
+any necessary transaction aborts. See the footnote for an example
+[#explicit_failure_example]_.
 
 Now let's return a job from the job.  This generally represents a result
 that is waiting on another asynchronous persistent call, which would
@@ -236,23 +236,24 @@
 topic.
 
 Callbacks
----------
+=========
 
-The job object can also be used to handle return values and
-exceptions from the call.  The ``addCallbacks`` method enables the
-functionality.  Its signature is (success=None, failure=None).  It may
-be called multiple times, each time adding a success and/or failure
-callable that takes an end result: a value or a zc.async.Failure object,
-respectively.  Failure objects are passed to failure callables, and
-any other results are passed to success callables.
+The job object can also be used to handle return values and exceptions from the
+call. The ``addCallbacks`` method enables the functionality. Its signature is
+(success=None, failure=None). It may be called multiple times, each time adding
+a success and/or failure callable that takes the end result of the original,
+parent job: a value or a zc.async.Failure object, respectively. Failure objects
+are passed to failure callables, and any other results are passed to success
+callables.
 
-The return value of the success and failure callables is
-important for chains and for determining whether a job had any
-errors that need logging, as we'll see below.  The call to
-``addCallbacks`` returns a job, which can be used for chaining (see
-``Chaining Callbacks``_).
+Note that, unlike with Twisted deferred's, the results of callbacks for a given
+job are not chained.  To chain, add a callback to the desired callback.  The
+primary reason for a Twisted callback is try:except:else:finally logic.  In
+most cases, because zc.async jobs are long-running, the try:except logic can be
+accomplished within the code for the job itself.  For certain kinds of
+exceptions, an IRetryPolicy is used instead.
 
-Let's look at a simple example.
+Let's look at a simple example of a callback.
 
     >>> def call(*args):
     ...     res = 1
@@ -333,6 +334,7 @@
     failure. [Failure instance: Traceback: exceptions.TypeError...]
     also a failure. [Failure instance: Traceback: exceptions.TypeError...]
 
+------------------
 Chaining Callbacks
 ------------------
 
@@ -384,6 +386,7 @@
     >>> isinstance(j.result, twisted.python.failure.Failure)
     True
 
+--------------------------
 Callbacks on Completed Job
 --------------------------
 
@@ -403,6 +406,7 @@
     >>> j.status == zc.async.interfaces.COMPLETED
     True
 
+-------------
 Chaining Jobs
 -------------
 
@@ -422,33 +426,68 @@
     success! 15
     also a success! 60
 
-Failing
--------
+Failures
+========
 
-Speaking again of failures, it's worth discussing two other aspects of
-failing.  One is that jobs offer an explicit way to fail a call.  It
-can be called when the job has a NEW, PENDING, ASSIGNED or ACTIVE status. 
-The primary use cases for this method are to cancel a job that is
-overdue to start, and to cancel a job that was in progress by a
-worker thread in a dispatcher when the dispatcher died (more on that below).
+Let's talk more about failures.  We'll divide them up into a two large types.
 
+- The code you ran in the job failed.
+
+- The system around your code (zc.async code, the ZODB, or even the machine on
+  which the job is running) failed.
+
+If your code fails, it's mostly up to you to deal with it.  There are two
+mechanisms to use: callbacks, which we've already seen, and retry policies,
+which we'll look at below.
+
+If the system fails around your code, four sorts of things might happen. In the
+context of the full zc.async system, it is the responsibility of the zc.async
+code to react as described below, and, when pertinent, your reponsibility to
+make sure that the retry policy for the job does what you need.
+
+- The job never started, and timed out. zc.async should call ``fail`` on the
+  job, which aborts the job and begins callbacks.  (This is handled in the
+  queue's ``claim`` method, in the full context of zc.async.)
+
+- The job started but was unable to commit. Internally to the job's machinery,
+  the job uses a retry policy to decide what to do, as described below.
+
+- The job started but then was interrupted. zc.async should call
+  ``handleInterrupt``, in which a retry policy decides what to do, as described
+  below. (This is handled in a DispatcherAgents collection's ``deactivate``
+  method, in the full context of zc.async.)
+
+- The job completed, but the callbacks were interrupted. zc.async should call
+  ``resumeCallbacks``, which will handle the remaining callbacks in the manner
+  described here. (This is handled in a DispatcherAgents collection's
+  ``deactivate`` method, in the full context of zc.async.)
+
+We need to explore a few elements of this, then: the ``fail`` method, retry
+policies, job errors, commit errors, and the ``handleInterrupt`` method.
+
+--------
+``fail``
+--------
+
+The ``fail`` method is an explicit way to fail a job. It can be called when the
+job has a NEW, PENDING, or ASSIGNED status. The use case for this method is to
+cancel a job that is overdue to start.  It defaults to a TimeoutError.
+
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
     >>> transaction.commit()
     >>> j.fail()
     >>> print j.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    Traceback (most recent call last):
-    ...
-    zc.async.interfaces.AbortedError:
+    Traceback...zc.async.interfaces.TimeoutError...
 
-``fail`` calls all failure callbacks with the failure.
+``fail`` calls all callbacks with the failure.
 
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
     >>> j_callback = j.addCallbacks(failure=failure)
     >>> transaction.commit()
     >>> res = j.fail() # doctest: +ELLIPSIS
-    failure. [Failure instance: Traceback...zc.async.interfaces.AbortedError...]
+    failure. [Failure instance: Traceback...zc.async.interfaces.TimeoutError...]
 
-As seen above, it fails with zc.async.interfaces.AbortedError by default.
+As seen above, it fails with zc.async.interfaces.TimeoutError by default.
 You can also pass in a different error.
 
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
@@ -459,43 +498,856 @@
     ...
     exceptions.RuntimeError: failed
 
-As mentioned, if a dispatcher dies when working on an active task, the
-active task should be aborted using ``fail``, so the method also works if
-a job has the ACTIVE status.  We'll reach under the covers to show this.
+It won't work for failing tasks in ACTIVE or COMPLETED status.
 
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
     >>> j._status = zc.async.interfaces.ACTIVE
     >>> transaction.commit()
     >>> j.fail()
-    >>> print j.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     Traceback (most recent call last):
     ...
-    zc.async.interfaces.AbortedError:
+    BadStatusError: can only call fail on a job with NEW, PENDING, or ASSIGNED status
 
-It won't work for failing tasks in COMPLETED or CALLBACKS status.
-
+    >>> j._status = zc.async.interfaces.NEW
     >>> j.fail()
+    >>> j.status == zc.async.interfaces.COMPLETED
+    True
+    >>> j.fail()
     Traceback (most recent call last):
     ...
-    BadStatusError: can only call fail on a job with NEW, PENDING, ASSIGNED, or ACTIVE status
+    BadStatusError: can only call fail on a job with NEW, PENDING, or ASSIGNED status
+
+It's a dirty secret that it will work in CALLBACKS status.  This is because we
+need to support retries in the face of internal commits.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
     >>> j._status = zc.async.interfaces.CALLBACKS
+    >>> transaction.commit()
     >>> j.fail()
-    Traceback (most recent call last):
+    >>> j.status == zc.async.interfaces.COMPLETED
+    True
+    >>> j.result is None # the default
+    True
+
+--------------
+Retry Policies
+--------------
+
+All of the other failure situations touch on retry policies, directly or
+indirectly.
+
+What is a retry policy?  It is used in three circumstances.
+
+- When the code the job ran fails, the job will try to call
+  ``retry_policy.jobError(failure, data_cache)`` to get a boolean as to whether
+  the job should be retried.
+
+- When the commit fails, the job will try to call
+  ``retry_policy.commitError(failure, data_cache)`` to get a boolean as to
+  whether the job should be retried.
+
+- When the job starts but fails to complete because the system is interrupted,
+  the job will try to call ``retry_policy.interrupted()`` to get a boolean as
+  to whether the job should be retried.
+
+Why does this need to be a policy?  Can't it be a simpler arrangement?
+
+The heart of the problem is that different jobs need different error
+resolutions.
+
+In some cases, jobs may not be fully transactional.  For instance, the job
+may be communicating with an external system, such as a credit card system.
+The retry policy here should typically be "never": perhaps a callback should be
+in charge of determining what to do next.
+
+If a job is fully transactional, it can be retried.  But even then the desired
+behavior may differ.
+
+- In typical cases, some errors should simply cause a failure, while other
+  errors, such as database conflict errors, should cause a limited number of
+  retries.
+
+- In some jobs, conflict errors should be retried forever, because the job must
+  be run to completion or else the system should fall over. Callbacks that try
+  to handle errors themselves may take this approach, for instance.
+
+zc.async currently ships with three retry policies.
+
+1.  The default, appropriate for most fully transactional jobs, is the
+    zc.async.job.RetryCommonFourTimes.
+
+2.  The other available (pre-written) option for transactional jobs is
+    zc.async.job.RetryCommonForever. Callbacks will get this policy by
+    default.
+
+Both of these policies retry ZEO disconnects forever; and interrupts and
+transaction errors such as conflicts either four times (for a total of five
+attempts) or forever, respectively.
+
+3.  The last retry policy is zc.async.job.NeverRetry.  This is appropriate for
+    non-transactional jobs. You'll still typically need to handle errors in
+    your callbacks.
+
+Let's take a detailed look at these three in isolation before seeing them in
+practice below.
+
+We'll start with the default retry policy, RetryCommonFourTimes.  We can get it
+with ``getRetryPolicy``.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+    True
+    >>> verifyObject(zc.async.interfaces.IRetryPolicy, policy)
+    True
+
+Now we'll try out a few calls.  ``jobError`` and ``commitError`` both take a
+failure and a dictionary to stash data about the retry.  The job then calls
+the retry policy with the data dictionary before a commit using the
+``updateData`` method.
+
+Here's the policy requesting that a job be tried a total of five times,
+combined from in the job and in the commit
+
+    >>> import zc.twist
+    >>> import ZODB.POSException
+    >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+    >>> data = {}
+
+    >>> policy.jobError(conflict, data)
+    True
+    >>> policy.jobError(conflict, data)
+    True
+    >>> policy.jobError(conflict, data)
+    True
+    >>> policy.jobError(conflict, data)
+    True
+    >>> policy.jobError(conflict, data)
+    False
+
+Now we've expired the total number of retries for this kind of error, so during
+commit it will fail immediately.
+
+    >>> policy.commitError(conflict, data)
+    False
+
+We'll reset the data (which holds the information until ``updateData`` is
+called to store the information persistently, so that aborted transactions
+don't discard the history from past attempts) and try again.
+
+    >>> data = {}
+    >>> policy.commitError(conflict, data)
+    True
+    >>> policy.commitError(conflict, data)
+    True
+    >>> policy.commitError(conflict, data)
+    True
+    >>> policy.commitError(conflict, data)
+    True
+    >>> policy.commitError(conflict, data)
+    False
+
+A ZEO ClientDisconnected error will be retried forever.  We treat 50 as close
+enough to "forever".  It uses time.sleep to backoff.  We'll stub that so we can
+see what happens.
+
+    >>> import time
+    >>> sleep_requests = []
+    >>> def sleep(i):
+    ...     sleep_requests.append(i)
     ...
-    BadStatusError: can only call fail on a job with NEW, PENDING, ASSIGNED, or ACTIVE status
+    >>> old_sleep = time.sleep
+    >>> time.sleep = sleep
 
-Using ``resumeCallbacks``
--------------------------
+    >>> import ZEO.Exceptions
+    >>> disconnect = zc.twist.Failure(ZEO.Exceptions.ClientDisconnected())
 
-So ``fail`` is the proper way to handle an active job that was being
-worked on by on eof a dead dispatcher's worker thread, but how does one
-handle a job that was in the CALLBACKS status?  The answer is to use
-resumeCallbacks.  Any job that is still pending will be called; any
-job that is active will be failed; any job that is in the middle
-of calling its own callbacks will have its ``resumeCallbacks`` called; and
-any job that is completed will be ignored.
+    >>> for i in range(50):
+    ...     if not policy.jobError(disconnect, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+    >>> sleep_requests # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+    >>> len(sleep_requests)
+    50
 
+After jobError has upped the backoff, it keeps at 60 seconds for
+ClientDisconnected errors at commit.
+
+    >>> policy.commitError(disconnect, data)
+    True
+    >>> sleep_requests[50]
+    60
+
+Here's the backoff happening in commitError.
+
+    >>> del sleep_requests[:]
+    >>> data = {}
+    >>> for i in range(50):
+    ...     if not policy.commitError(disconnect, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+    >>> sleep_requests # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+    >>> len(sleep_requests)
+    50
+
+If we encounter another kind of error, no retries are requested.
+
+    >>> runtime = zc.twist.Failure(RuntimeError())
+    >>> policy.jobError(runtime, data)
+    False
+    >>> policy.commitError(runtime, data)
+    False
+    >>> value = zc.twist.Failure(ValueError())
+    >>> policy.jobError(value, data)
+    False
+    >>> policy.commitError(value, data)
+    False
+
+When the system is interrupted, ``handleInterrupt`` uses the retry_policy's
+``interrupted`` method to determine what to do.  It does not need to pass a
+data dictionary.  The default policy will retry ten times, for a total of ten
+attempts.
+
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    True
+    >>> policy.interrupted()
+    False
+
+``updateData`` is only used after ``jobError`` and ``commitError``, and is
+called every time there is a commit attempt, so we're wildly out of order here,
+but we'll call the method now anyway to show we can, and then perform the job.
+
+    >>> transaction.commit()
+    >>> policy.updateData(data)
+    >>> j()
+    10
+
+The policy may also want to perform additional logging.
+
+The other policies perform similarly. [#show_other_policies]_
+
+To change a policy on a job, you have two options. First, you can change the
+``retry_policy_factory`` on your instance.
+
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> j.retry_policy_factory = zc.async.job.NeverRetry
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.NeverRetry)
+    True
+
+Second, you can leverage the fact that the default policy tries to adapt the
+job to zc.async.interfaces.IRetryPolicy (with the default name of ''), and only
+if that fails does it use RetryCommonFourTimes.
+
+    >>> import zope.component
+    >>> zope.component.provideAdapter(
+    ...     zc.async.job.RetryCommonForever,
+    ...     provides=zc.async.interfaces.IRetryPolicy)
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonForever)
+    True
+
+    >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+    ...     zc.async.job.RetryCommonForever,
+    ...     provided=zc.async.interfaces.IRetryPolicy) # tearDown
+    True
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+    True
+
+If you are working with callbacks, the default retry policy is
+``RetryCommonForever``.
+
+    >>> def foo(result):
+    ...     print result
+    ...
+    >>> callback = j.addCallback(foo)
+    >>> policy = callback.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonForever)
+    True
+
+This can be changed in the same kind of way as a non-callback job.  You can
+set the ``retry_policy_factory``.
+
+    >>> callback = j.addCallback(foo)
+    >>> callback.retry_policy_factory = zc.async.job.NeverRetry
+    >>> policy = callback.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.NeverRetry)
+    True
+
+You can also register an adapter.  In this case it must be an adapter
+registered with the 'callback' name.
+
+    >>> zope.component.provideAdapter(
+    ...     zc.async.job.RetryCommonFourTimes,
+    ...     provides=zc.async.interfaces.IRetryPolicy, name='callback')
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> callback = j.addCallback(foo)
+    >>> policy = callback.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+    True
+
+    >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+    ...     zc.async.job.RetryCommonFourTimes,
+    ...     provided=zc.async.interfaces.IRetryPolicy, name='callback') # tearDown
+    True
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> callback = j.addCallback(foo)
+    >>> policy = callback.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonForever)
+    True
+
+Finally, it is worth noting that, typically, once a retry policy has been
+obtained with ``getRetryPolicy``, changing the factory or adapter registration
+will not change the policy: it has already been instantiated and stored on the
+job.
+
+    >>> callback.retry_policy_factory = zc.async.job.NeverRetry
+    >>> isinstance(callback.getRetryPolicy(), zc.async.job.NeverRetry)
+    False
+    >>> policy is callback.getRetryPolicy()
+    True
+
+Now we'll look at the standard retry policies in use as we examine more failure
+scenarios below.
+
+---------------------
+Failures in Your Code
+---------------------
+
+As seen above, failures in your code will generally be ignored by the default
+retry policy, and passed on as a Failure object on the job's request.  The
+only difference is for ConflictErrors (retry five times) and ClientDisconnected
+(retry forever).  Let's manufacture some of these problems.
+
+First, a few "normal" errors.  They just go to the result.
+
+    >>> count = 0
+    >>> max = 50
+    >>> def raiseAnError(klass):
+    ...     global count
+    ...     count += 1
+    ...     if count < max:
+    ...         raise klass()
+    ...     else:
+    ...         print 'tried %d times.  stopping.' % (count,)
+    ...         return 42
+    ...
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError, ValueError)
+    >>> transaction.commit()
+    >>> job()
+    <zc.twist.Failure exceptions.ValueError>
+    >>> job.result
+    <zc.twist.Failure exceptions.ValueError>
+    >>> count
+    1
+    >>> count = 0
+
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+    >>> transaction.commit()
+    >>> job()
+    <zc.twist.Failure exceptions.TypeError>
+    >>> job.result
+    <zc.twist.Failure exceptions.TypeError>
+    >>> count
+    1
+    >>> count = 0
+
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError, RuntimeError)
+    >>> transaction.commit()
+    >>> job()
+    <zc.twist.Failure exceptions.RuntimeError>
+    >>> job.result
+    <zc.twist.Failure exceptions.RuntimeError>
+    >>> count
+    1
+    >>> count = 0
+
+If we raise a ConflictError (or any TransactionError), that will try five
+times, and then be let through, so it will look very similar to the previous
+examples except for our ``count`` variable.
+
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+    ...                                    ZODB.POSException.ConflictError)
+    >>> transaction.commit()
+    >>> job()
+    <zc.twist.Failure ZODB.POSException.ConflictError>
+    >>> job.result
+    <zc.twist.Failure ZODB.POSException.ConflictError>
+    >>> count
+    5
+    >>> count = 0
+
+It's worth showing that, as you'd expect, if the job succeeds within the
+allotted retries, everything is fine.
+
+    >>> max = 3
+
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+    ...                                    ZODB.POSException.ConflictError)
+    >>> transaction.commit()
+    >>> job()
+    tried 3 times.  stopping.
+    42
+    >>> job.result
+    42
+    >>> count
+    3
+    >>> count = 0
+
+If we raise a ClientDisconnected error, that will retry forever, with a
+timeout.  our job doesn't let this happen, but the retry policy would.
+
+    >>> max = 50
+    >>> del sleep_requests[:]
+    >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+    ...                                    ZEO.Exceptions.ClientDisconnected)
+    >>> transaction.commit()
+    >>> job()
+    tried 50 times.  stopping.
+    42
+    >>> count
+    50
+    >>> sleep_requests # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+    >>> len(sleep_requests)
+    49
+    >>> count = 0
+
+None of the provided retry policies utilize this functionality, but the
+pertinent retry policy method, ``jobError`` can not only return ``True``
+(retry now) or ``False`` (do not retry) but a datetime.datetime or a
+datetime.timedelta.  Datetimes and timedeltas indicate that the policy requests
+that the job be returned to the queue to be claimed again.  Let's give an
+example of this.
+
+As implied by this description, this functionality only makes sense within
+a fuller zc.async context.  We need a stub Agent as a job's parent, and a stub
+Queue in its ``parent`` lineage.
+
+    >>> class StubQueue(persistent.Persistent):
+    ...     zope.interface.implements(zc.async.interfaces.IQueue)
+    ...     def putBack(self, job):
+    ...         self.put_back = job
+    ...         job.parent = self
+    ...     def put(self, job, begin_after=None):
+    ...         self.put_job = job
+    ...         self.put_begin_after = begin_after
+    ...         job.parent = self
+    ...
+    >>> class StubAgent(persistent.Persistent):
+    ...     zope.interface.implements(zc.async.interfaces.IAgent)
+    ...     def __init__(self):
+    ...         # usually would have dispatcher agent parent, which would then
+    ...         # have queue as parent, but this is a stub
+    ...         self.parent = StubQueue()
+    ...     def jobCompleted(self, job):
+    ...         self.completed = job
+    ...     def remove(self, job):
+    ...         job.parent = None
+    ...         self.removed = job
+    ...
+
+We'll also need a quick stub retry policy that takes advantage of this
+functionality.
+
+    >>> import persistent
+    >>> import datetime
+    >>> class StubRescheduleRetryPolicy(persistent.Persistent):
+    ...     zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+    ...     _reply = datetime.timedelta(hours=1)
+    ...     def __init__(self, job):
+    ...         self.parent = self.__parent__ = job
+    ...     def jobError(self, failure, data_cache):
+    ...         return self._reply
+    ...     def commitError(self, failure, data_cache):
+    ...         return self._reply
+    ...     def interrupted(self):
+    ...         return self._reply
+    ...     def updateData(self, data_cache):
+    ...         pass
+    ...
+
+    >>> j = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+    >>> agent = j.parent = StubAgent()
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> transaction.commit()
+    >>> j is j()
+    True
+
+Notice above that, when this happens, the result of the call is the job itself.
+
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> agent.removed is j
+    True
+    >>> agent.parent.put_job is j
+    True
+    >>> agent.parent.put_begin_after
+    datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+    >>> import pytz
+    >>> j = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+    >>> agent = j.parent = StubAgent()
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+    >>> transaction.commit()
+    >>> j is j()
+    True
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> agent.removed is j
+    True
+    >>> agent.parent.put_job is j
+    True
+    >>> agent.parent.put_begin_after
+    datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+------------------
+Failures on Commit
+------------------
+
+Failures on commit are handled very similarly to those occurring while
+performing the job.
+
+We'll have to hack ``TransactionManager.commit`` to show this.
+
+    >>> from transaction import TransactionManager
+    >>> old_commit = TransactionManager.commit
+    >>> commit_count = 0
+    >>> error = None
+    >>> max = 2
+    >>> allow_commits = [1] # change state to active
+    >>> def new_commit(self):
+    ...     global commit_count
+    ...     commit_count += 1
+    ...     if commit_count in allow_commits:
+    ...         old_commit(self) # changing state to "active" or similar
+    ...     elif commit_count - len(allow_commits) < max:
+    ...         raise error()
+    ...     else:
+    ...         if commit_count - len(allow_commits) == max:
+    ...             print 'tried %d time(s).  committing.' % (
+    ...                 commit_count-len(allow_commits))
+    ...         old_commit(self)
+    ...
+    >>> TransactionManager.commit = new_commit
+    >>> count = 0
+    >>> def count_calls():
+    ...     global count
+    ...     count += 1
+    ...     return 42
+    ...
+
+Normal errors will simply pass through with only one try.
+
+    >>> error = ValueError
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 2 time(s).  committing.
+    <zc.twist.Failure exceptions.ValueError>
+    >>> count
+    1
+
+Note that, since the error at commit is obscuring the original result, the
+original result is logged.
+
+    >>> for r in reversed(event_logs.records):
+    ...     if r.levelname == 'INFO':
+    ...         break
+    ... else:
+    ...     assert False, 'could not find log'
+    ...
+    >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Commit failed ... Prior to this, job succeeded with result: 42
+
+This happens both for non-error results (as above) and for error results.
+
+    >>> count = commit_count = 0
+    >>> allow_commits = [1, 2, 3] # set to active, then get retry policy, then call
+    >>> j = root['j'] = zc.async.job.Job(raiseAnError, RuntimeError)
+    >>> transaction.commit()
+    >>> j()
+    tried 2 time(s).  committing.
+    <zc.twist.Failure exceptions.ValueError>
+    >>> found = []
+    >>> for r in reversed(event_logs.records):
+    ...     if r.levelname == 'ERROR':
+    ...         found.append(r)
+    ...         if len(found) == 2:
+    ...             break
+    ... else:
+    ...     assert False, 'could not find log'
+    ...
+    >>> print found[1].getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Commit failed ... Prior to this, job failed with traceback:
+    ...
+    exceptions.RuntimeError...
+
+The ValueError, from transaction time, is the main error recorded.
+
+    >>> print found[0].getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    <...Job...raiseAnError... failed with traceback:
+    ...
+    exceptions.ValueError...
+
+The conflict error will be tried five times.
+
+    >>> max = 6
+    >>> count = commit_count = 0
+    >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13] # set active state, then get retries
+    >>> error = ZODB.POSException.ConflictError
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 6 time(s).  committing.
+    <zc.twist.Failure ZODB.POSException.ConflictError>
+    >>> count
+    5
+
+If it succeeds in the number of tries allotted, the result will be fine.
+
+    >>> max = 3
+    >>> count = commit_count = 0
+    >>> allow_commits = [1, 3, 4, 6, 7]
+    >>> error = ZODB.POSException.ConflictError
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 3 time(s).  committing.
+    42
+    >>> count
+    3
+
+As an aside, the code regards getting the desired retry policy as a requirement
+and so will try until it does so.  Here's a repeat of the previous example,
+with some retries getting a failed commit.  Everything except a ConflictError
+has a backoff.
+
+    >>> max = 25
+    >>> count = commit_count = 0
+    >>> allow_commits = [1]
+    >>> error = ZODB.POSException.ConflictError
+    >>> del sleep_requests[:]
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 25 time(s).  committing.
+    42
+    >>> count
+    2
+    >>> sleep_requests
+    []
+
+The ClientDisconnected will be tried forever until it succeeds, with a backoff.
+We're putting in some irregular patterns of commits to test that these also
+work, which skew some of the results
+
+    >>> max = 50
+    >>> count = commit_count = 0
+    >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 22, 25]
+    >>> del sleep_requests[:]
+    >>> error = ZEO.Exceptions.ClientDisconnected
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 50 time(s).  committing.
+    42
+    >>> count
+    9
+    >>> len(sleep_requests)
+    51
+
+None of the provided retry policies utilize this functionality, but the
+pertinent retry policy method, ``jobError`` can not only return ``True``
+(retry now) or ``False`` (do not retry) but a datetime.datetime or a
+datetime.timedelta.  Datetimes and timedeltas indicate that the policy requests
+that the job be returned to the queue to be claimed again.  Let's give an
+example of this.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> agent = j.parent = StubAgent()
+    >>> StubRescheduleRetryPolicy._reply = datetime.timedelta(hours=1)
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> count = commit_count = 0
+    >>> max = 1
+    >>> error = ValueError
+    >>> transaction.commit()
+    >>> j is j()
+    True
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> agent.removed is j
+    True
+    >>> agent.parent.put_job is j
+    True
+    >>> agent.parent.put_begin_after
+    datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> agent = j.parent = StubAgent()
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+    >>> count = commit_count = 0
+    >>> transaction.commit()
+    >>> j is j()
+    True
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> agent.removed is j
+    True
+    >>> agent.parent.put_job is j
+    True
+    >>> agent.parent.put_begin_after
+    datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+All of this looks very similar to job errors. The only big difference between
+the behavior of job errors and commit errors is that a failure of a failure to
+commit *must* commit: it will try forever until it succeeds or is interrupted.
+Here's an example with a ConflictError.
+
+    >>> count = commit_count = 0
+    >>> max = 50
+    >>> error = ZODB.POSException.ConflictError
+    >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13]
+    >>> j = root['j'] = zc.async.job.Job(count_calls)
+    >>> transaction.commit()
+    >>> j()
+    tried 50 time(s).  committing.
+    <zc.twist.Failure ZODB.POSException.ConflictError>
+    >>> count
+    5
+
+    >>> TransactionManager.commit = old_commit
+
+    >>> time.sleep = old_sleep # tearDown
+
+-------------------
+``handleInterrupt``
+-------------------
+
+Not infrequently, systems will be stopped while jobs are in the middle of their
+work.  When the clean-up occurs, the job needs to figure out what to do to
+handle the interruption.  The right thing to do is to call ``handleInterrupt``.
+This method itself defers to a retry policy to determine what to do.
+
+This method takes no arguments. The default behavior is to allow up to 10
+interruptions. It expects a parent agent to reschedule it in a queue.  Let's
+give an example.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> queue = j.parent = StubQueue()
+    >>> j._status = zc.async.interfaces.ACTIVE
+    >>> transaction.commit()
+    >>> j.handleInterrupt()
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> queue.put_back is j
+    True
+
+After 9 interruptions, the default policy will give up after the tenth try.
+
+    >>> for i in range(8):
+    ...     j.parent = agent
+    ...     j._status = zc.async.interfaces.ACTIVE
+    ...     j.handleInterrupt()
+    ...     if j.status != zc.async.interfaces.PENDING:
+    ...         print 'error', i, j.status
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+    >>> j.parent = agent
+    >>> j._status = zc.async.interfaces.ACTIVE
+    >>> j.handleInterrupt()
+    >>> j.status == zc.async.interfaces.COMPLETED
+    True
+    >>> j.result
+    <zc.twist.Failure zc.async.interfaces.AbortedError>
+
+If the retry policy returns True, as happens above, this should be interpreted
+by the agent and queue as a request to immediately reschedule the job. The
+policy can also return a datettime.timedelta or datetime.datetime to ask that
+it be rescheduled later in the future.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> queue = j.parent = StubQueue()
+    >>> StubRescheduleRetryPolicy._reply = datetime.timedelta(hours=1)
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> j._status = zc.async.interfaces.ACTIVE
+    >>> transaction.commit()
+    >>> j.handleInterrupt()
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> queue.put_job is j
+    True
+    >>> queue.put_begin_after
+    datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> queue = j.parent = StubQueue()
+    >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+    >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+    >>> j._status = zc.async.interfaces.ACTIVE
+    >>> transaction.commit()
+    >>> j.handleInterrupt()
+    >>> j.status == zc.async.interfaces.PENDING
+    True
+    >>> queue.put_job is j
+    True
+    >>> queue.put_begin_after
+    datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+In a full zc.async context, rescheduling is more of a dance between the job,
+the agent, and the queue.  See `catastrophes.txt` for more information.
+
+-------------------
+``resumeCallbacks``
+-------------------
+
+``handleInterrupt`` should be called when a job was working on its own code.
+But what happens if the system stops while a job was working on its callbacks?
+When the job is handled, the system should call ``resumeCallbacks``. The
+method will call any callback that is still pending, and not timed out because
+of ``begin_by``; it will call ``fail`` on any callback that is pending and
+timed out; it will call ``handleInterrupt`` on any callback that is active'; it
+will call ``resumeCallbacks`` on any callback that itself has the CALLBACKS
+status; and will ignore any job that is completed.  As such, it encompasses
+all of the retry behavior discussed above.  Moreover, while it does not use
+retry policies directly, indirectly they are often used.
+
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
     >>> j._result = 10
     >>> j._status = zc.async.interfaces.CALLBACKS
     >>> completed_j = zc.async.job.Job(multiply, 3)
@@ -517,11 +1369,8 @@
     80
     >>> sub_callbacks_j.status == zc.async.interfaces.COMPLETED
     True
-    >>> print active_j.result.getTraceback()
-    ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    Traceback (most recent call last):
-    ...
-    zc.async.interfaces.AbortedError:
+    >>> active_j.result # was retried because of default retry policy
+    50
     >>> active_j.status == zc.async.interfaces.COMPLETED
     True
     >>> pending_j.result
@@ -529,8 +1378,12 @@
     >>> pending_j.status == zc.async.interfaces.COMPLETED
     True
 
-[#aborted_active_callback_log]_
+[#retried_active_callback_log]_
 
+Introspection
+=============
+
+------------------------------------
 Introspecting and Mutating Arguments
 ------------------------------------
 
@@ -567,6 +1420,7 @@
     >>> res = j() # doctest: +ELLIPSIS
     <zc.async.job.Job (oid ...>
 
+-----------------
 Result and Status
 -----------------
 
@@ -673,6 +1527,7 @@
 - Also, a job's direct callback may not call the job
   [#callback_self]_.
 
+----------------------
 More Job Introspection
 ----------------------
 
@@ -769,6 +1624,8 @@
     True
     >>> transaction.abort()
 
+    >>> time.sleep = old_sleep # probably put in test tearDown
+
 =========
 Footnotes
 =========
@@ -783,15 +1640,17 @@
     >>> root = conn.root()
     >>> import zc.async.configure
     >>> zc.async.configure.base()
+    >>> import zc.async.testing
+    >>> zc.async.testing.setUpDatetime() # pins datetimes
 
 .. [#verify] Verify interface
 
     >>> from zope.interface.verify import verifyObject
     >>> verifyObject(zc.async.interfaces.IJob, j)
     True
-    
+
     Note that status and result are readonly.
-    
+
     >>> j.status = 1
     Traceback (most recent call last):
     ...
@@ -804,9 +1663,9 @@
 .. [#no_live_frames] Failures have two particularly dangerous bits: the
     traceback and the stack.  We use the __getstate__ code on Failures
     to clean them up.  This makes the traceback (``tb``) None...
-    
+
     >>> j.result.tb # None
-    
+
     ...and it makes all of the values in the stack--the locals and
     globals-- into strings.  The stack is a list of lists, in which each
     internal list represents a frame, and contains five elements: the
@@ -814,14 +1673,14 @@
     the line number (``f_lineno``), an items list of the locals, and an
     items list for the globals.  All of the values in the items list
     would normally be objects, but are now strings.
-    
+
     >>> for (codename, filename, lineno, local_i, global_i) in j.result.stack:
     ...     for k, v in local_i:
     ...         assert isinstance(v, basestring), 'bad local %s' % (v,)
     ...     for k, v in global_i:
     ...         assert isinstance(v, basestring), 'bad global %s' % (v,)
     ...
-    
+
     Here's a reasonable question.  The Twisted Failure code has a
     __getstate__ that cleans up the failure, and that's even what we are
     using to sanitize the failure.  If the failure is attached to a
@@ -845,7 +1704,7 @@
     desired changes to the transaction (such as aborting) before the
     job calls commit.  Compare.  Here is a call that raises an
     exception, and rolls back changes.
-    
+
     (Note that we are passing arguments to the job, a topic that has
     not yet been discussed in the text when this footnote is given: read
     on a bit in the main text to see the details, if it seems surprising
@@ -942,14 +1801,16 @@
     ...     assert False, 'could not find log'
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    <zc.async.job.Job (oid 114, db 'unnamed')
+    <zc.async.job.Job (oid ..., db 'unnamed')
      ``zc.async.job.completeStartedJobArguments(zc.async.job.Job
-                                                (oid 109, db 'unnamed'))``>
+                                                (oid ..., db 'unnamed'))``>
     succeeded with result:
     None
 
-    However, the callbacks that fail themselves are logged (again, as all jobs
-    are) at the "error" level and include a detailed traceback.
+    However, the callbacks that fail themselves are logged (by default at a
+    "CRITICAL" level because callbacks are often in charge of error handling,
+    and having an error in your error handler may be a serious problem) and
+    include a detailed traceback.
 
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 4)
     >>> transaction.commit()
@@ -957,8 +1818,8 @@
     >>> j() # doctest: +ELLIPSIS
     20
 
-    >>> for r in reversed(trace_logs.records):
-    ...     if r.levelname == 'ERROR':
+    >>> for r in reversed(event_logs.records):
+    ...     if r.levelname == 'CRITICAL':
     ...         break
     ... else:
     ...     assert False, 'could not find log'
@@ -966,29 +1827,204 @@
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     <...Job...``zc.async.doctest_test.multiply()``> failed with traceback:
     *--- Failure #... (pickled) ---
-    .../zc/async/job.py:...
+    ...job.py:...
      [ Locals...
-      res : 'None...
-     ( Globals...
-    .../zc/async/job.py:...
-     [ Locals...
       effective_args : '[20]...
      ( Globals...
     exceptions.TypeError: multiply() takes at least 2 arguments (1 given)
     *--- End of Failure #... ---
     <BLANKLINE>
 
-.. [#aborted_active_callback_log] The fact that the pseudo-aborted "active"
-    job failed is logged.
+.. [#show_other_policies]
 
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> j.retry_policy_factory = zc.async.job.RetryCommonForever
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.RetryCommonForever)
+    True
+
+    Here's the policy requesting that a job be tried "forever" (we show 50
+    times), both in the job and in the commit.
+
+    >>> import zc.twist
+    >>> import ZODB.POSException
+    >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+    >>> data = {}
+
+    >>> for i in range(50):
+    ...     if not policy.jobError(conflict, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+
+    >>> for i in range(50):
+    ...     if not policy.commitError(conflict, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+
+    A ZEO ClientDisconnected error will also be retried forever, but with a
+    backoff.
+
+    >>> import ZEO.Exceptions
+    >>> disconnect = zc.twist.Failure(ZEO.Exceptions.ClientDisconnected())
+    >>> del sleep_requests[:]
+
+    >>> for i in range(50):
+    ...     if not policy.jobError(disconnect, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+    >>> sleep_requests # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+    >>> len(sleep_requests)
+    50
+    >>> del sleep_requests[:]
+
+    >>> data = {}
+    >>> for i in range(50):
+    ...     if not policy.commitError(disconnect, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+    >>> sleep_requests # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+    >>> len(sleep_requests)
+    50
+
+    If we encounter another kind of error during the job, no retries are
+    requested.
+
+    >>> runtime = zc.twist.Failure(RuntimeError())
+    >>> policy.jobError(runtime, data)
+    False
+    >>> value = zc.twist.Failure(ValueError())
+    >>> policy.jobError(value, data)
+    False
+
+    However, during the commit, any kind of error will cause a retry, forever.
+
+    >>> for i in range(50):
+    ...     if not policy.commitError(runtime, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+
+    >>> for i in range(50):
+    ...     if not policy.commitError(value, data):
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+
+    When the system is interrupted, ``handleInterrupt`` uses the retry_policy's
+    ``interrupted`` method to determine what to do.  It does not need to pass a
+    data dictionary.  This also happens forever.
+
+    >>> for i in range(50):
+    ...     if not policy.interrupted():
+    ...         print 'error'
+    ...         break
+    ... else:
+    ...     print 'success'
+    ...
+    success
+
+    ``updateData`` is only used after ``jobError`` and ``commitError``, and is
+    called every time there is a commit attempt, so we're wildly out of order here,
+    but we'll call the method now anyway to show we can, and then perform the job.
+
+    >>> transaction.commit()
+    >>> policy.updateData(data)
+    >>> j()
+    10
+
+Now we will show ``NeverRetry``. As the name implies, this is a simple policy
+for jobs that should never retry.  Typical reasons for this are that the job is
+talking to an external system or doing something else that is effectively not
+transactional.  More sophisticated policies for specific versions of this
+scenario may be possible; however, also consider callbacks for this usecase.
+
+    >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+    >>> j.retry_policy_factory = zc.async.job.NeverRetry
+    >>> policy = j.getRetryPolicy()
+    >>> isinstance(policy, zc.async.job.NeverRetry)
+    True
+
+    So, here's a bunch of "no"s.
+
+    >>> import zc.twist
+    >>> import ZODB.POSException
+    >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+    >>> data = {}
+
+    >>> policy.jobError(conflict, data)
+    False
+
+    >>> policy.commitError(conflict, data)
+    False
+
+    >>> policy.jobError(disconnect, data)
+    False
+
+    >>> policy.commitError(disconnect, data)
+    False
+
+    >>> runtime = zc.twist.Failure(RuntimeError())
+    >>> policy.jobError(runtime, data)
+    False
+
+    >>> policy.commitError(runtime, data)
+    False
+
+    >>> value = zc.twist.Failure(ValueError())
+    >>> policy.jobError(value, data)
+    False
+
+    >>> policy.commitError(value, data)
+    False
+
+    >>> policy.interrupted()
+    False
+
+    ``updateData`` is only used after ``jobError`` and ``commitError``, and is
+    called every time there is a commit attempt, so we're wildly out of order here,
+    but we'll call the method now anyway to show we can, and then perform the job.
+
+    >>> transaction.commit()
+    >>> policy.updateData(data)
+    >>> j()
+    10
+
+.. [#retried_active_callback_log] The fact that the pseudo-aborted "active"
+    job was retried is logged.
+
     >>> for r in reversed(trace_logs.records):
-    ...     if r.levelname == 'DEBUG' and r.getMessage().startswith('failing'):
+    ...     if (r.levelname == 'DEBUG' and
+    ...         r.getMessage().startswith('retrying interrupted')):
     ...         break
     ... else:
     ...     assert False, 'could not find log'
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
-    failing aborted callback
+    retrying interrupted callback
     <...Job...``zc.async.doctest_test.multiply(5)``> to <...Job...>
 
 .. [#call_self] Here's a job trying to call itself.

Modified: zc.async/branches/dev/src/zc/async/queue.py
===================================================================
--- zc.async/branches/dev/src/zc/async/queue.py	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/queue.py	2008-06-17 02:38:06 UTC (rev 87444)
@@ -13,6 +13,7 @@
 ##############################################################################
 import datetime
 import bisect
+import logging
 import pytz
 import persistent
 import persistent.interfaces
@@ -23,6 +24,7 @@
 import zope.component
 import zope.event
 import zope.bforest
+import zope.minmax
 import zc.queue
 import zc.dict
 
@@ -43,22 +45,20 @@
 
     UUID = None
     activated = None
-    
+
     def __init__(self, uuid):
         super(DispatcherAgents, self).__init__()
         self.UUID = uuid
-        self.__class__.last_ping.initialize(self)
-    
-    zc.async.utils.createAtom('last_ping', None)
-    
+        self.last_ping = zope.minmax.Maximum()
+
     ping_interval = datetime.timedelta(seconds=30)
     ping_death_interval = datetime.timedelta(seconds=60)
 
     @property
     def dead(self):
-        last_ping = self.last_ping
+        last_ping = self.last_ping.value
         if self.activated and (
-            self.last_ping is None or self.activated > self.last_ping):
+            last_ping is None or self.activated > last_ping):
             last_ping = self.activated
         elif last_ping is None:
             return False
@@ -103,27 +103,43 @@
             else:
                 while job is not None:
                     status = job.status
-                    if status == zc.async.interfaces.ASSIGNED:
+                    if status in (zc.async.interfaces.PENDING,
+                                  zc.async.interfaces.ASSIGNED):
+                        # odd
+                        zc.async.log.warning(
+                            'unexpected job status %s for %r; treating as NEW',
+                            status, job)
+                        status = zc.async.interfaces.NEW
+                    if status == zc.async.interfaces.NEW:
                         tmp = job.assignerUUID
                         job.assignerUUID = None
                         job.parent = None
                         queue.put(job)
                         job.assignerUUID = tmp
                     elif job.status == zc.async.interfaces.ACTIVE:
-                        queue.put(job.handleInterrupt)
+                        j = queue.put(
+                            job.handleInterrupt,
+                            retry_policy_factory=zc.async.job.RetryCommonForever,
+                            error_log_level=logging.CRITICAL)
                     elif job.status == zc.async.interfaces.CALLBACKS:
-                        queue.put(job.resumeCallbacks)
+                        j = queue.put(
+                            job.resumeCallbacks,
+                            retry_policy_factory=zc.async.job.RetryCommonForever,
+                            error_log_level=logging.CRITICAL)
                     elif job.status == zc.async.interfaces.COMPLETED:
                         # huh, that's odd.
                         agent.completed.add(job)
+                        zc.async.utils.log.warning(
+                            'unexpectedly had to inform agent of completion '
+                            'of %r', job)
                     try:
                         job = agent.pull()
                     except IndexError:
                         job = None
         zope.event.notify(
             zc.async.interfaces.DispatcherDeactivated(self))
-        
 
+
 class Queues(zc.async.utils.Dict):
 
     def __setitem__(self, key, value):
@@ -161,9 +177,10 @@
         if not da.activated:
             raise ValueError('UUID is not activated.')
         now = datetime.datetime.now(pytz.UTC)
-        if (da.last_ping is None or
-            da.last_ping + da.ping_interval <= now):
-            da.last_ping = now
+        last_ping = da.last_ping.value
+        if (last_ping is None or
+            last_ping + da.ping_interval <= now):
+            da.last_ping.value = now
         next = self._getNextActiveSibling(uuid)
         if next is not None and next.dead:
             # `next` seems to be a dead dispatcher.
@@ -188,28 +205,37 @@
         self.size = size
 
     def clean(self):
+        now = datetime.datetime.now(pytz.UTC)
         for i, job in enumerate(reversed(self._data)):
-            if job.status in (
-                zc.async.interfaces.CALLBACKS,
-                zc.async.interfaces.COMPLETED):
+            status = job.status
+            if status in (zc.async.interfaces.CALLBACKS,
+                          zc.async.interfaces.COMPLETED) or (
+                status == zc.async.interfaces.PENDING and
+                job.begin_after > now): # for a rescheduled task
                 self._data.pull(-1-i)
 
     @property
     def filled(self):
         return len(self._data) >= self.size
 
+    def __contains__(self, item):
+        for i in self:
+            if i is item:
+                return True
+        return False
+
     def add(self, item):
+        if item in self:
+            return
         if not zc.async.interfaces.IJob.providedBy(item):
             raise ValueError('must be IJob')
         if self.name not in item.quota_names:
             raise ValueError('quota name must be in quota_names')
-        # self.clean()
         if self.filled:
             raise ValueError('Quota is filled')
         self._data.put(item)
 
-    for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__', 'get', 
-               '__contains__'):
+    for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__', 'get'):
         locals()[nm] = zc.async.utils.simpleWrapper(nm)
 
 
@@ -229,6 +255,8 @@
 class Queue(zc.async.utils.Base):
     zope.interface.implements(zc.async.interfaces.IQueue)
 
+    _putback_queue = None
+
     def __init__(self):
         self._queue = zc.queue.CompositeQueue()
         self._held = BTrees.OOBTree.OOBTree()
@@ -239,13 +267,13 @@
         self.dispatchers.__parent__ = self
 
     def put(self, item, begin_after=None, begin_by=None,
-            error_log_level=None, retry_policy=None):
+            error_log_level=None, retry_policy_factory=None):
         item = zc.async.interfaces.IJob(item)
         if error_log_level is not None:
             item.error_log_level = error_log_level
-        if retry_policy is not None:
-            item.retry_policy = retry_policy
-        if item.assignerUUID is not None:
+        if retry_policy_factory is not None:
+            item.retry_policy_factory = retry_policy_factory
+        if item.status != zc.async.interfaces.NEW:
             raise ValueError(
                 'cannot add already-assigned job')
         for name in item.quota_names:
@@ -258,10 +286,9 @@
             item.begin_after = now
         if begin_by is not None:
             item.begin_by = begin_by
-        elif item.begin_by is None:
-            item.begin_by = datetime.timedelta(hours=1) # good idea?
-        item.assignerUUID = zope.component.getUtility(
-            zc.async.interfaces.IUUID)
+        if item.assignerUUID is not None: # rescheduled job keeps old UUID
+            item.assignerUUID = zope.component.getUtility(
+                zc.async.interfaces.IUUID)
         if item._p_jar is None:
             # we need to do this if the job will be stored in another
             # database as well during this transaction.  Also, _held storage
@@ -279,7 +306,35 @@
         self._length.change(1)
         return item
 
+    def putBack(self, item):
+        # an agent has claimed a job, but now the job needs to be returned. the
+        # only current caller for this is a job's ``handleInterrupt`` method.
+        # The scenario for this is that the agent's dispatcher died while the
+        # job was active, interrupting the work; and the job's retry policy
+        # asks that the job be put back on the queue to be claimed immediately.
+        # This method puts the job in a special internal queue that ``_iter``
+        # looks at first. This allows jobs to maintain their order, if needed,
+        # within a quota.
+        assert zc.async.interfaces.IJob.providedBy(item)
+        assert item.status == zc.async.interfaces.NEW, item.status
+        assert item.begin_after is not None
+        assert item._p_jar is not None
+        # to support legacy instances of the queue that were created before
+        # this functionality and its separate internal data structure were
+        # part of the code, we instantiate the _putback_queue when we first
+        # need it, here.
+        if self._putback_queue is None:
+            self._putback_queue = zc.queue.CompositeQueue()
+        self._putback_queue.put(item)
+        item.parent = self
+        self._length.change(1)
+
     def _iter(self):
+        putback_queue = self._putback_queue
+        if putback_queue: # not None and not empty
+            dq_pop = putback_queue.pull
+            for dq_ix, dq_next in enumerate(putback_queue):
+                yield dq_pop, dq_ix, dq_next
         queue = self._queue
         tree = self._held
         q = enumerate(queue)
@@ -336,16 +391,17 @@
         if not self._length():
             return default
         uuid = None
+        quotas_cleaned = set()
         for pop, ix, job in self._iter():
             if job.begin_after > now:
                 break
             res = None
             quotas = []
-            if (job.begin_after + job.begin_by) < now:
-                res = zc.async.interfaces.IJob(
-                        job.fail) # specify TimeoutError?
+            if (job.begin_by is not None and
+                (job.begin_after + job.begin_by) < now):
+                res = zc.async.interfaces.IJob(job.fail)
+                res.args.append(zc.async.interfaces.TimeoutError())
                 res.begin_after = now
-                res.begin_by = datetime.timedelta(hours=1)
                 res.parent = self
                 if uuid is None:
                     uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
@@ -354,8 +410,10 @@
                 for name in job.quota_names:
                     quota = self.quotas.get(name)
                     if quota is not None:
-                        quota.clean()
-                        if quota.filled:
+                        if name not in quotas_cleaned:
+                            quota.clean()
+                            quotas_cleaned.add(name)
+                        if quota.filled and job not in quota:
                             break
                         quotas.append(quota)
                 else:

Modified: zc.async/branches/dev/src/zc/async/queue.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/queue.txt	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/queue.txt	2008-06-17 02:38:06 UTC (rev 87444)
@@ -24,7 +24,7 @@
 [#queues_collection]_ As shown in the README.txt of this package (or see
 zc.async.adapters.defaultQueueAdapter), the queue with the name '' will
 typically be registered as an adapter to persistent objects that
-provides zc.async.interfaces.IQueue [#verify]_. 
+provides zc.async.interfaces.IQueue [#verify]_.
 
 The queue doesn't have any jobs yet.
 
@@ -82,25 +82,27 @@
     >>> job.parent is queue
     True
     >>> transaction.commit()
-    
+
 A job added without any special calls gets a `begin_after` attribute
 of now.
 
     >>> import datetime
     >>> import pytz
-    >>> now = datetime.datetime.now(pytz.UTC) 
+    >>> now = datetime.datetime.now(pytz.UTC)
     >>> now
     datetime.datetime(2006, 8, 10, 15, 44, 22, 211, tzinfo=<UTC>)
     >>> job.begin_after == now
     True
 
-A ``begin_by`` attribute is a duration, and defaults to one hour.  This
-means that it must be completed an hour after the ``begin_after`` datetime,
-or else the system will fail it.
+A ``begin_by`` attribute is a duration (datetime.timedelta) or None, and
+defaults to None.
 
-    >>> job.begin_by == datetime.timedelta(hours=1)
+    >>> job.begin_by == None
     True
 
+If ``begin_by`` were an hour, that would mean that it must be completed an hour
+after the ``begin_after`` datetime, or else the system will fail it.
+
 Now let's add a job to be performed later, using ``begin_after``.
 
 This means that it's immediately ready to be performed: we can ``claim`` it.
@@ -240,9 +242,9 @@
 - If begin_after + begin_by >= now, a job that makes the original job fail
   is used instead.
 
-- If a job has one or more ``quota_names`` and the associated quotas are
-  filled with jobs not in the CALLBACKS or COMPLETED status then it will
-  not be returned.
+- If a job is not failing and has one or more ``quota_names`` and the
+  associated quotas are filled with jobs not in the CALLBACKS or COMPLETED
+  status then it will not be returned.
 
 - It does not take an index argument.
 
@@ -391,22 +393,46 @@
     ...     @property
     ...     def queue(self):
     ...         return self.parent.parent
-    ...     def claimJob(self):
+    ...     def claimJob(self, filter=None):
     ...         if len(self) < self.size:
-    ...             job = self.queue.claim()
+    ...             job = self.queue.claim(filter=filter)
     ...             if job is not None:
     ...                 job.parent = self
     ...                 self.append(job)
     ...                 return job
-    ...     def pull(self):
-    ...         return self.pop(0)
+    ...     def pull(self, index=0):
+    ...         res = self.pop(index)
+    ...         res.parent = None
+    ...         return res
+    ...     def remove(self, item):
+    ...         self.pull(self.index(item))
     ...     def jobCompleted(self, job):
+    ...         persistent.list.PersistentList.remove(self, job)
     ...         self.completed.add(job)
-    ...         
+    ...     def reschedule(self, item, when):
+    ...         now = datetime.datetime.utcnow()
+    ...         if isinstance(when, datetime.datetime):
+    ...             self.remove(job)
+    ...             if when.tzinfo is not None:
+    ...                 when = when.astimezone(pytz.UTC).replace(tzinfo=None)
+    ...             if when <= now:
+    ...                 self.queue.disclaim(item)
+    ...             else:
+    ...                 self.queue.put(item, begin_after=when)
+    ...         elif isinstance(when, datetime.timedelta):
+    ...             self.remove(job)
+    ...             if when <= datetime.timedelta():
+    ...                 self.queue.disclaim(item)
+    ...             else:
+    ...                 self.queue.put(item, begin_after=now+when)
+    ...         else:
+    ...             raise TypeError('``when`` must be datetime or timedelta')
+    ...
 
     >>> job4 is queue.claim()
     True
     >>> job4.parent = StubAgent()
+    >>> job4.parent.append(job4)
     >>> job4.status == zc.async.interfaces.ASSIGNED
     True
     >>> list(quota) == [job4]
@@ -426,23 +452,25 @@
 timed out are returned in wrappers that fail the original job.
 
     >>> job9_from_outer_space = queue.put(mock_work)
+    >>> job9_from_outer_space.begin_by = datetime.timedelta(hours=1)
     >>> zc.async.testing.set_now(
     ...     datetime.datetime.now(pytz.UTC) +
     ...     job9_from_outer_space.begin_by + datetime.timedelta(seconds=1))
     >>> job9 = queue.claim()
     >>> job9 is job9_from_outer_space
     False
-    >>> stub = root['stub'] = StubAgent() 
+    >>> stub = root['stub'] = StubAgent()
     >>> job9.parent = stub
+    >>> stub.append(job9)
     >>> transaction.commit()
     >>> job9()
     >>> job9_from_outer_space.status == zc.async.interfaces.COMPLETED
     True
     >>> print job9_from_outer_space.result.getTraceback()
+    ... # doctest: +NORMALIZE_WHITESPACE
     Traceback (most recent call last):
-    Failure: zc.async.interfaces.AbortedError: 
+    Failure: zc.async.interfaces.TimeoutError:
     <BLANKLINE>
-    
 
 Dispatchers
 ===========
@@ -492,12 +520,12 @@
 
     >>> print da.activated
     None
-    >>> print da.last_ping
+    >>> print da.last_ping.value
     None
 
-When the object's ``last_ping`` + ``ping_interval`` is greater than now,
-a new ``last_ping`` should be recorded, as we'll see below.  If the
-``last_ping`` (or ``activated``, if more recent) +
+When the object's ``last_ping.value`` + ``ping_interval`` is greater than now,
+a new ``last_ping.value`` should be recorded, as we'll see below.  If the
+``last_ping.value`` (or ``activated``, if more recent) +
 ``ping_death_interval`` is older than now, the dispatcher is considered to
 be ``dead``.
 
@@ -514,7 +542,7 @@
     >>> import pytz
     >>> now = datetime.datetime.now(pytz.UTC)
     >>> da.activate()
-    >>> now <= da.activated <= datetime.datetime.now(pytz.UTC) 
+    >>> now <= da.activated <= datetime.datetime.now(pytz.UTC)
     True
 
 It's still not dead. :-)
@@ -638,10 +666,10 @@
 
     >>> before = datetime.datetime.now(pytz.UTC)
     >>> pollStub(conn)
-    >>> before <= da.last_ping <= datetime.datetime.now(pytz.UTC)
+    >>> before <= da.last_ping.value <= datetime.datetime.now(pytz.UTC)
     True
 
-We don't have any jobs to claim yet.  Let's add one and do it again. 
+We don't have any jobs to claim yet.  Let's add one and do it again.
 We'll use a test fixture, time_flies, to make the time change.
 
     >>> job10 = queue.put(mock_work)
@@ -652,10 +680,10 @@
     ...         datetime.timedelta(seconds=seconds))
     ...
 
-    >>> last_ping = da.last_ping
+    >>> last_ping = da.last_ping.value
     >>> time_flies(5)
     >>> pollStub(conn)
-    >>> da.last_ping == last_ping
+    >>> da.last_ping.value == last_ping
     True
 
     >>> len(jobs_to_do)
@@ -669,11 +697,11 @@
 
     >>> time_flies(10)
     >>> pollStub(conn)
-    >>> da.last_ping == last_ping
+    >>> da.last_ping.value == last_ping
     True
     >>> time_flies(15)
     >>> pollStub(conn)
-    >>> da.last_ping > last_ping
+    >>> da.last_ping.value > last_ping
     True
 
 Dead Dispatchers
@@ -684,7 +712,7 @@
 then proceed.  But what if a queue has more than one simultaneous
 dispatcher?  How do we know to clean out the dead dispatcher's jobs?
 
-The ``ping`` method not only changes the ``last_ping`` but checks the
+The ``ping`` method not only changes the ``last_ping.value`` but checks the
 next sibling dispatcher, as defined by UUID, to make sure that it is not
 dead. It uses the ``dead`` attribute, introduced above, to test whether
 the sibling is alive.
@@ -708,8 +736,8 @@
     False
 
 Let's do that again, with an agent in the new dispatcher and some jobs in the
-agent.  Assigned jobs will be reassigned; in-progress jobs will have a
-new task that fails them; callback jobs will resume their callback; and
+agent. Assigned jobs will be reassigned; in-progress jobs will have a new task
+that calls ``handleInterrupt``; callback jobs will resume their callback; and
 completed jobs will be moved to the completed collection.
 
     >>> alt_agent = alt_da['main'] = StubAgent()
@@ -755,16 +783,385 @@
     4
     >>> queue[1] is jobA
     True
-    >>> queue[2].callable == jobB.fail
+    >>> queue[2].callable == jobB.handleInterrupt
     True
     >>> queue[3].callable == jobC.resumeCallbacks
     True
     >>> alt_agent.completed.first() is jobD
     True
 
+As an aside, notice that the ``handleInterrupt`` and ``resumeCallbacks`` jobs
+have custom error log levels, and custom retry policies.
+
+    >>> import logging
+    >>> queue[2].effective_error_log_level == logging.CRITICAL
+    True
+    >>> queue[3].effective_error_log_level == logging.CRITICAL
+    True
+    >>> queue[2].retry_policy_factory is zc.async.job.RetryCommonForever
+    True
+    >>> queue[3].retry_policy_factory is zc.async.job.RetryCommonForever
+    True
+
 If you have multiple workers, it is strongly suggested that you get the
 associated servers connected to a shared time server.
 
+Rescheduled Jobs
+----------------
+
+When a job is interrupted--it was active when it stopped--it asks its retry
+policy whether to retry.  The policy can respond in one of three ways:
+
+- yes, please retry right now;
+
+- yes, please retry later; or
+
+- no, please do not retry, and fail instead.
+
+All three of these possibilities potentially affect the queue in different
+ways, so we will look at examples.
+
+First, let's look at the simple cases for all three.
+
+The scenario above lets us look at what is probably the most common case: the
+default retry policy.  Let's claim the ``handleInterrupt`` task.
+
+    >>> len(queue)
+    4
+    >>> j = alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'handleInterrupt')
+    ...
+    >>> j.callable == jobB.handleInterrupt
+    True
+    >>> len(queue)
+    3
+
+The default retry policy's behavior is to retry nine times, for a total of ten
+attempts, and to request that the original job be retried as soon as
+possible--first in line. That behavior will become more interesting when we
+look at interaction with quotas below. But now, performing this job will push
+jobB back in the queue--again, notably, in very first place in the queue.
+
+    >>> j()
+    >>> len(queue)
+    4
+    >>> queue[0] is jobB
+    True
+    >>> jobB is alt_agent.claimJob()
+    True
+    >>> jobB()
+    42
+    >>> len(queue)
+    3
+
+That was an example of the retry policy saying "please retry right now":
+returning ``True``. Now let's look at the other two possible responses.
+
+"please retry later": with a datetime.
+
+    >>> import persistent
+    >>> import datetime
+    >>> class StubRetryPolicy(persistent.Persistent):
+    ...     zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+    ...     _reply = datetime.datetime(3000, 1, 1)
+    ...     def __init__(self, job):
+    ...         self.parent = self.__parent__ = job
+    ...     def jobError(self, failure, data_cache):
+    ...         return self._reply
+    ...     def commitError(self, failure, data_cache):
+    ...         return self._reply
+    ...     def interrupted(self):
+    ...         return self._reply
+    ...     def updateData(self, data_cache):
+    ...         pass
+    ...
+
+    >>> j = zc.async.job.Job(mock_work)
+    >>> j._status = zc.async.interfaces.ACTIVE # hack internals for test
+    >>> j.retry_policy_factory = StubRetryPolicy
+    >>> j_wrap = queue.put(j.handleInterrupt)
+    >>> len(queue)
+    4
+    >>> j_wrap is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'handleInterrupt')
+    ...
+    True
+    >>> len(queue)
+    3
+    >>> j_wrap()
+    >>> len(queue)
+    4
+    >>> queue[3] is j
+    True
+    >>> j.begin_after
+    datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+...with a timedelta.
+
+    >>> j = zc.async.job.Job(mock_work)
+    >>> j._status = zc.async.interfaces.ACTIVE # hack internals for test
+    >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+    >>> j.retry_policy_factory = StubRetryPolicy
+    >>> j_wrap = queue.put(j.handleInterrupt)
+    >>> len(queue)
+    5
+    >>> j_wrap is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'handleInterrupt')
+    ...
+    True
+    >>> len(queue)
+    4
+    >>> j_wrap()
+    >>> len(queue)
+    5
+    >>> queue[3] is j
+    True
+    >>> j.begin_after
+    datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+"please do not retry": does not affect the queue, so we will not show it here.
+
+The trickiest aspects occur with quotas.
+
+"please retry now": with a quota.  In this case, the job should retain its
+precise place in the quota if the quota has a limit of one, and should remain
+in the quota if the quota has a greater limit.
+
+    >>> quota = queue.quotas['content catalog']
+    >>> quota[0]()
+    42
+    >>> quota.clean()
+    >>> len(quota)
+    0
+    >>> j = queue.put(mock_work)
+    >>> j.quota_names = ('content catalog',)
+    >>> j2 = queue.put(mock_work)
+    >>> j2.quota_names = ('content catalog',)
+    >>> transaction.commit()
+    >>> len(queue)
+    7
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            'content catalog' in candidate.quota_names)
+    ...
+    True
+    >>> list(quota) == [j]
+    True
+
+    >>> j._status = zc.async.interfaces.ACTIVE # fake beginning to call
+    >>> alt_agent.remove(j)
+    >>> j_wrap = queue.put(j.handleInterrupt)
+
+The ``j`` job still claims the space in the quota, so ``j2`` can't be
+claimed.
+
+    >>> print alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            'content catalog' in candidate.quota_names)
+    ...
+    None
+
+Now we'll get the ``handleInterrupt`` task.  The ``j`` job still cannot be
+removed from the quota.
+
+    >>> j_wrap is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'handleInterrupt')
+    ...
+    True
+    >>> len(quota)
+    1
+    >>> quota.clean()
+    >>> list(quota) == [j]
+    True
+
+Now we'll perform the task.  The ``j`` job returns to the queue in a
+``PENDING`` status, and the quota still cannot be cleared.
+
+    >>> j_wrap()
+    >>> len(queue)
+    7
+    >>> queue[0] is j
+    True
+    >>> len(quota)
+    1
+    >>> quota.clean()
+    >>> list(quota) == [j]
+    True
+
+The agent can claim job ``j`` again, however.
+
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            'content catalog' in candidate.quota_names)
+    ...
+    True
+
+Still cannot clear the quota.
+
+    >>> len(quota)
+    1
+    >>> quota.clean()
+    >>> list(quota) == [j]
+    True
+
+Once we perform j, the quota can be cleared, and j2 can be claimed, as usual.
+
+    >>> j()
+    42
+
+    >>> j2 is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            'content catalog' in candidate.quota_names)
+    ...
+    True
+
+"please retry later": with a quota.  In this case, other jobs can take the
+place of the original job in the quota.
+
+    >>> j3 = queue.put(mock_work)
+    >>> j3.quota_names = ('content catalog',)
+    >>> transaction.commit()
+    >>> j2.retry_policy_factory = StubRetryPolicy
+    >>> list(quota) == [j2]
+    True
+
+    >>> j2._status = zc.async.interfaces.ACTIVE # fake beginning to call
+    >>> alt_agent.remove(j2)
+    >>> j2_wrap = queue.put(j2.handleInterrupt)
+    >>> j2_wrap is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'handleInterrupt')
+    ...
+    True
+    >>> len(queue)
+    6
+    >>> j2_wrap()
+    >>> len(queue)
+    7
+    >>> queue[-2] is j2
+    True
+    >>> j2.status == zc.async.interfaces.PENDING
+    True
+
+    >>> j3 is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            'content catalog' in candidate.quota_names)
+    ...
+    True
+    >>> j3()
+    42
+
+A retry policy can also request resceduling from a jobError or a commitError,
+although the default retry policies do not.
+
+jobError: "please reschedule now"
+
+    >>> def raiseAnError():
+    ...     raise RuntimeError
+    ...
+    >>> StubRetryPolicy._reply = datetime.timedelta()
+    >>> j = queue.put(raiseAnError, retry_policy_factory=StubRetryPolicy)
+    >>> len(queue)
+    7
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'raiseAnError')
+    ...
+    True
+    >>> len(queue)
+    6
+    >>> j() is j
+    True
+    >>> len(queue)
+    7
+    >>> queue.claim() is j
+    True
+
+jobError: "please reschedule later"
+
+    >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+    >>> j = queue.put(raiseAnError, retry_policy_factory=StubRetryPolicy)
+    >>> len(queue)
+    7
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'raiseAnError')
+    ...
+    True
+    >>> len(queue)
+    6
+    >>> j() is j
+    True
+    >>> len(queue)
+    7
+    >>> queue[-2] is j
+    True
+    >>> j.begin_after
+    datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+commitError: "please reschedule now"
+
+    >>> error_flag = False
+    >>> old_commit = transaction.TransactionManager.commit
+    >>> def commit(self):
+    ...     global error_flag
+    ...     if error_flag:
+    ...         error_flag = False
+    ...         raise ValueError()
+    ...     old_commit(self)
+    ...
+    >>> transaction.TransactionManager.commit = commit
+    >>> def causeCommitError():
+    ...     global error_flag
+    ...     error_flag = True
+    ...
+    >>> StubRetryPolicy._reply = datetime.timedelta()
+    >>> j = queue.put(causeCommitError, retry_policy_factory=StubRetryPolicy)
+    >>> len(queue)
+    8
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'causeCommitError')
+    ...
+    True
+    >>> len(queue)
+    7
+    >>> j() is j
+    True
+    >>> len(queue)
+    8
+    >>> queue.claim() is j
+    True
+
+commitError: "please reschedule later"
+
+    >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+    >>> j = queue.put(causeCommitError, retry_policy_factory=StubRetryPolicy)
+    >>> len(queue)
+    8
+    >>> j is alt_agent.claimJob(
+    ...     filter=lambda candidate:
+    ...            candidate.callable.__name__ == 'causeCommitError')
+    ...
+    True
+    >>> len(queue)
+    7
+    >>> j() is j
+    True
+    >>> len(queue)
+    8
+    >>> queue[-2] is j
+    True
+    >>> j.begin_after
+    datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+
+    >>> transaction.TransactionManager.commit = old_commit
+
 =========
 Footnotes
 =========
@@ -801,12 +1198,12 @@
     Traceback (most recent call last):
     ...
     KeyError: 'foo'
-    
+
     >>> container['foo'] = None
     Traceback (most recent call last):
     ...
     ValueError: value must be IQueue
-    
+
     >>> del container['']
     >>> len(container)
     0

Modified: zc.async/branches/dev/src/zc/async/testing.py
===================================================================
--- zc.async/branches/dev/src/zc/async/testing.py	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/testing.py	2008-06-17 02:38:06 UTC (rev 87444)
@@ -13,7 +13,9 @@
 ##############################################################################
 import threading
 import bisect
-import time
+from time import sleep as time_sleep # this import style is intentional, so
+# that test monkeypatching of time.sleep does not affect the usage in this
+# module
 import datetime
 
 import pytz
@@ -51,6 +53,9 @@
         raw = super(_datetime, self).__repr__()
         return "datetime.datetime%s" % (
             raw[raw.index('('):],)
+    def __add__(self, other):
+        return _datetime(
+            *super(_datetime,self).__add__(other).__reduce__()[1])
     def __reduce__(self):
         return (argh, super(_datetime, self).__reduce__()[1])
 def argh(*args, **kwargs):
@@ -103,7 +108,7 @@
         assert _when == 'before' and _event == 'shutdown', (
             'unsupported trigger')
         self.triggers.append((_when, _event, _callable, args, kwargs))
-    
+
     def callInThread(self, _callable, *args, **kw):
         # very naive should be fine...
         thread = threading.Thread(target=_callable, args=args, kwargs=kw)
@@ -177,7 +182,7 @@
                     break
             else:
                 break
-            time.sleep(0.1)
+            time_sleep(0.1)
         else:
             print 'TIME OUT'
 
@@ -189,7 +194,7 @@
     for i in range(seconds * 10):
         if len(dispatcher.polls) > count:
             return dispatcher.polls.first()
-        time.sleep(0.1)
+        time_sleep(0.1)
     else:
         assert False, 'no poll!'
 
@@ -198,7 +203,7 @@
         t = transaction.begin()
         if job.status == zc.async.interfaces.COMPLETED:
             return job.result
-        time.sleep(0.1)
+        time_sleep(0.1)
     else:
         assert False, 'job never completed'
 
@@ -208,6 +213,6 @@
         t = transaction.begin()
         if name in job.annotations:
             return job.annotations[name]
-        time.sleep(0.1)
+        time_sleep(0.1)
     else:
         assert False, 'annotation never found'

Modified: zc.async/branches/dev/src/zc/async/utils.py
===================================================================
--- zc.async/branches/dev/src/zc/async/utils.py	2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/utils.py	2008-06-17 02:38:06 UTC (rev 87444)
@@ -14,11 +14,13 @@
 import datetime
 import logging
 import sys
+import time
 
 import ZEO.Exceptions
 import ZODB.POSException
 import rwproperty
 import persistent
+import zope.minmax
 import zc.dict
 import pytz
 import zope.bforest.periodic
@@ -58,39 +60,12 @@
     def __parent__(self, value):
         self._z_parent__ = None
 
+# for legacy databases
+Atom = zope.minmax.Maximum
 
-class Atom(persistent.Persistent):
-    def __init__(self, value):
-        self.value = value
 
-    def __getstate__(self):
-        return self.value
-
-    def __setstate__(self, state):
-        self.value = state
-
-class AtomDescriptor(object):
-    def __init__(self, name, initial=None):
-        self.name = name
-        self.initial = initial
-
-    def __get__(self, obj, klass=None):
-        if obj is None:
-            return self
-        return obj.__dict__[self.name].value
-
-    def __set__(self, obj, value):
-        obj.__dict__[self.name].value = value
-
-    def initialize(self, obj):
-        obj.__dict__[self.name] = Atom(self.initial)
-
-def createAtom(name, initial):
-    sys._getframe(1).f_locals[name] = AtomDescriptor(name, initial)
-
-
 class Dict(zc.dict.Dict, Base):
-    
+
     copy = None # mask
 
     def __setitem__(self, key, value):
@@ -264,7 +239,7 @@
             if not trans_ct % 5:
                 log.warning(
                     '%d consecutive transaction errors while %s',
-                    ct, identifier, exc_info=True)
+                    trans_ct, identifier, exc_info=True)
                 res = None
         except EXPLOSIVE_ERRORS:
             tm.abort()
@@ -277,11 +252,11 @@
             tm.abort()
             backoff_ct += 1
             if backoff_ct == 1:
+                # import pdb; pdb.set_trace()
                 log.log(level,
                         'first error while %s; will continue in %d seconds',
                         identifier, backoff, exc_info=True)
-            elif not backoff_ct % 5:
-                
+            elif not backoff_ct % 10:
                 log.log(level,
                         '%d consecutive errors while %s; '
                         'will continue in %d seconds',
@@ -320,7 +295,7 @@
                 log.error('first error while %s; will continue in %d seconds',
                           identifier, backoff, exc_info=True)
             elif not backoff_ct % 5:
-                
+
                 log.error('%d consecutive errors while %s; '
                           'will continue in %d seconds',
                           backoff_ct, identifier, backoff, exc_info=True)
@@ -357,4 +332,4 @@
             tm.abort()
             log.error('Error while %s', identifier, exc_info=True)
             res = zc.twist.Failure()
-        return res
\ No newline at end of file
+        return res



More information about the Checkins mailing list