[Checkins] SVN: zc.async/branches/dev/s checkpoint;
close to release, just needs some clean-up
Gary Poster
gary at zope.com
Mon Jun 16 22:38:08 EDT 2008
Log message for revision 87444:
checkpoint; close to release, just needs some clean-up
Changed:
U zc.async/branches/dev/setup.py
U zc.async/branches/dev/src/zc/async/CHANGES.txt
U zc.async/branches/dev/src/zc/async/README.txt
U zc.async/branches/dev/src/zc/async/TODO.txt
U zc.async/branches/dev/src/zc/async/catastrophes.txt
U zc.async/branches/dev/src/zc/async/configure.py
U zc.async/branches/dev/src/zc/async/dispatcher.py
U zc.async/branches/dev/src/zc/async/dispatcher.txt
U zc.async/branches/dev/src/zc/async/interfaces.py
U zc.async/branches/dev/src/zc/async/job.py
U zc.async/branches/dev/src/zc/async/job.txt
U zc.async/branches/dev/src/zc/async/queue.py
U zc.async/branches/dev/src/zc/async/queue.txt
U zc.async/branches/dev/src/zc/async/testing.py
U zc.async/branches/dev/src/zc/async/utils.py
-=-
Modified: zc.async/branches/dev/setup.py
===================================================================
--- zc.async/branches/dev/setup.py 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/setup.py 2008-06-17 02:38:06 UTC (rev 87444)
@@ -18,7 +18,6 @@
try:
import docutils
except ImportError:
- import warnings
def validateReST(text):
return ''
else:
@@ -97,13 +96,14 @@
'zc.dict>=1.2.1',
'zc.twist>=1.2',
'Twisted>=8.0.1', # 8.0 was setuptools compatible, 8.0.1 had bugfixes.
- # note that Twisted builds with warnings, at least with py2.4. It
+ # note that Twisted builds with warnings with py2.4. It
# seems to still build ok.
'zope.bforest>=1.2',
'zope.component',
'zope.event',
'zope.i18nmessageid',
'zope.interface',
+ 'zope.minmax',
'zope.testing',
'rwproperty',
],
Modified: zc.async/branches/dev/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/CHANGES.txt 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/CHANGES.txt 2008-06-17 02:38:06 UTC (rev 87444)
@@ -1,6 +1,57 @@
1.2 (unreleased)
================
+- made the log for finding an activated agent report the pertinent queue's oid
+ as an unpacked integer, rather than the packed string blob. As mentioned
+ above, use ``ZODB.utils.p64`` to convert back to an oid that the ZODB will
+ recognize.
+
+- Bugfix: in failing a job, the job thought it was in its old agent, and the
+ ``fail`` call failed. This is now tested by the first example in new doctest
+ ``catastrophes.txt``.
+
+- jobs no longer default to a ``begin_by`` value of one hour after the
+ ``begin_after``. The default now is no limit.
+
+- Made dispatcher much more robust to transaction errors and ZEO
+ ClientDisconnected errors.
+
+- Jobs now use an IRetryPolicy to decide what to do on failure within a job,
+ within the commit of the result, and if the job is interrupted. This allows
+ support of transactional jobs, transactional jobs that critically must be
+ run to completion, and non-transactional jobs such as communicating with an
+ external service.
+
+- The default retry policy supports retries for ClientDisconnected errors,
+ transaction errors, and interruptions.
+
+- ``job.txt`` has been expanded significantly to show error handling and the
+ use of retry policies. New file ``catastrophes.txt`` shows handling of other
+ catastrophes, such as interruptions to polling.
+
+- job errors now go in the main zc.async.event log rather than in the
+ zc.async.trace log. Successes continue to go in the trace log.
+
+- callback failures go to the main log as a CRITICAL error, by default.
+
+- ``handleInterrupt`` is the new protocol on jobs to inform them that they were
+ active in a dispatcher that is now dead. They either fail or reschedule,
+ depending on the associated IRetryPolicy for the job. If they reschedule,
+ this should either be a datetime or timedelta. The job calls the agent's
+ ``reschedule`` method. If the timedelta is empty or negative, or the datetime
+ is earlier than now, the job is put back in the queue with a new ``putBack``
+ method on the queue. This is intended to be the opposite of ``claim``. Jobs
+ put in the queue with ``putBack`` will be pulled out before any others.
+
+- convert to using zope.minmax rather than locally defined ``Atom``.
+
+- Fix (and simplify) last_ping code so as to reduce unnecessarily writing the
+ state of the parent DispatcherAgents collection to the database whenever the
+ atom changed.
+
+1.1.1 (2008-05-14)
+==================
+
- more README tweaks.
- converted all reports from the dispatcher, including the monitor output,
@@ -21,15 +72,6 @@
Fixed, which also means we need a new version of zope.bforest (1.2) for a new
feature there.
-- made the log for finding an activated agent report the pertinent queue's oid
- as an unpacked integer, rather than the packed string blob. As mentioned
- above, use ``ZODB.utils.p64`` to convert back to an oid that the ZODB will
- recognize.
-
-- Bugfix: in failing a job, the job thought it was in its old agent, and the
- ``fail`` call failed. This is now tested by the first example in new doctest
- ``catastrophes.txt``.
-
1.1 (2008-04-24)
================
@@ -49,7 +91,7 @@
- Had the ThreadedDispatcherInstaller subscriber stash the thread on the
dispatcher, so you can shut down tests like this:
-
+
>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
Modified: zc.async/branches/dev/src/zc/async/README.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README.txt 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/README.txt 2008-06-17 02:38:06 UTC (rev 87444)
@@ -1087,15 +1087,16 @@
>>> t = transaction.begin()
>>> job = queue.put(
- ... send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
+ ... send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC),
+ ... datetime.timedelta(hours=1))
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> job.result
- <zc.twist.Failure zc.async.interfaces.AbortedError>
+ <zc.twist.Failure zc.async.interfaces.TimeoutError>
>>> import sys
>>> job.result.printTraceback(sys.stdout) # doctest: +NORMALIZE_WHITESPACE
Traceback (most recent call last):
- Failure: zc.async.interfaces.AbortedError:
+ Failure: zc.async.interfaces.TimeoutError:
.. [#job] The Job class can take arguments and keyword arguments
for the wrapped callable at call time as well, similar to Python
Modified: zc.async/branches/dev/src/zc/async/TODO.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/TODO.txt 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/TODO.txt 2008-06-17 02:38:06 UTC (rev 87444)
@@ -1,65 +1,10 @@
Bugs and improvements:
-- need retry tasks, particularly for callbacks. ``retry_count`` affects aborts
- and transaction failures? None == infinity, which is what callbacks use?
- Should retry have a cleanup function?
-
-- need CRITICAL logs for callbacks
-
-- when database went away, and then came back, async didn't come back. Fix
- (and also reconsider retry behavior in Dispatcher._commit and
- AgentThreadPool.perform_thread).
-
-Traceback (most recent call last):
- File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/dispatcher.py", line 321, in _commit
- transaction.commit()
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_manager.py", line 93, in commit
- return self.get().commit()
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 325, in commit
- self._commitResources()
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 422, in _commitResources
- rm.tpc_begin(self)
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/Connection.py", line 525, in tpc_begin
- self._normal_storage.tpc_begin(transaction)
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 1079, in tpc_begin
- self._server.tpc_begin(id(txn), txn.user, txn.description,
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 86, in __getattr__
- raise ClientDisconnected()
-ClientDisconnected
-
-Exception in thread Thread-5:
-Traceback (most recent call last):
- File "/opt/cleanpython24/lib/python2.4/threading.py", line 442, in __bootstrap
- self.run()
- File "/opt/cleanpython24/lib/python2.4/threading.py", line 422, in run
- self.__target(*self.__args, **self.__kwargs)
- File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/dispatcher.py", line 153, in perform_thread
- job() # this does the committing and retrying, largely
- File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 290, in __call__
- return self._call_with_retry(
- File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 321, in _call_with_retry
- res = self._complete(zc.twist.Failure(), tm)
- File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 340, in _complete
- tm.commit()
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_manager.py", line 93, in commit
- return self.get().commit()
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 325, in commit
- self._commitResources()
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 422, in _commitResources
- rm.tpc_begin(self)
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/Connection.py", line 525, in tpc_begin
- self._normal_storage.tpc_begin(transaction)
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 1079, in tpc_begin
- self._server.tpc_begin(id(txn), txn.user, txn.description,
- File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 86, in __getattr__
- raise ClientDisconnected()
-ClientDisconnected
-
+- need to tell zc.twist.Partial for callbacks to retry forever: currently no
+ spelling for this in zc.twist. This makes using deferreds less reliable:
+ _callback *must* be called.
- be even more pessimistic about memory for saved polls and job info in
dispatcher.
-
-For future versions:
-
- queues should be pluggable like agent with filter
- show how to broadcast, maybe add conveniences
- show how to use with collapsing jobs (hint to future self: use external queue
@@ -85,10 +30,17 @@
Notice that all of the tests in this package use FileStorage.
* callbacks should be very, very quick, and very reliable. If you want to do
something that might take a while, put another job in the queue
-
+More docs:
+
+- custom retry policies, particularly for non-transactional tasks;
+
+- changing the default retry policy, per-process and per-agent; and
+
+- changing the default log level for queue jobs, callback jobs, and per-job.
+
+
For some other package, maybe:
- TTW Management and logging views, as in zasync (see goals in the "History"
section of the README).
-
\ No newline at end of file
Modified: zc.async/branches/dev/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/catastrophes.txt 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/catastrophes.txt 2008-06-17 02:38:06 UTC (rev 87444)
@@ -21,10 +21,9 @@
predictable, and if we knew of a bug, we'd try to fix it, rather than discuss
it here!
-Exploring polling exceptions and job related exceptions will illuminate the
-more specific catastrophes you may encounter, and how your code and zc.async's
-can work together to handle them. We'll discuss each, then drill down into
-some specific scenarios.
+We'll discuss both polling exceptions and job related exceptions, then drill
+down into some specific scenarios. This will illuminate how your code and
+zc.async's can work together to handle them.
Polling Exceptions
------------------
@@ -117,7 +116,16 @@
As discussed elsewhere, if your job fails in your own code, this is mostly
your reponsibility. You should handle possible errors both within your job's
-code, and in callbacks, as appropriate. zc.async's responsibilities are merely
+code, and in callbacks, as appropriate.
+
+The other tool at your disposal for this situation, as with others below, is a
+retry policy. Retry policies let you determine what zc.async should do when
+your job fails. The default retry policy for job failures (as well as commit
+failures, below) is that transaction errors, such as conflict errors, are
+retried five times, and a ZEO ClientDisconnected error is retried forever with
+a backoff. You can customize these.
+
+Other than supporting these tools, zc.async's only other responsibilities are
to report.
By default, zc.async will log a failure of a job entered in a queue at the
@@ -129,11 +137,8 @@
zc.async also includes a ``Failure`` object on the job as a result, to let you
react to the problem in a callback, and analyze it later. This is discussed in
-detail elsewhere.
+detail in other documents.
-The RetryPolicy, discussed later, can try to react to exceptions from the job's
-code, but the default policies included in the package do not.
-
Process Ends
............
@@ -156,8 +161,8 @@
fallout.
As we'll see below, zc.async defaults to guessing that jobs placed directly in
-a queue are transactional, and can be restarted up to ten times; and that jobs
-used as callbacks are also transactional, and should be restarted until they
+a queue are transactional, and can be tried up to ten times; and that jobs
+used as callbacks are also transactional, and can be tried until they
succeed. The defaults can be changed and the behavior of an individual
job can be changed.
@@ -171,24 +176,22 @@
ClientDisconnected errors should often cause jobs to be aborted and restarted.
However, if the job is not transactional, such as communicating with an
external service, a simple abourt and retry may be hazardous. Also, many jobs
-should be stopped if they retry on ConflictError more than some heuristic
-bellweather, with the logic that they may be simply doing something too
-problematic, and they are blocking other tasks from starting. But other jobs
-should be retried until they complete.
+should be stopped if they retry on ConflictError more than some number of
+times--a heuristic bellweather--with the logic that they may be simply doing
+something too problematic, and they are blocking other tasks from starting. But
+other jobs should be retried until they complete.
As mentioned above, zc.async defaults to guessing that jobs are transactional.
-Client Disconnected errors are retried forever. Jobs placed in a queue retry
-ConflictErrors five times, while callbacks retry them forever, with a small
-backoff. The defaults can be changed and the behavior of an individual
-job can be changed, using the RetryPolicy described below.
+Client Disconnected errors are retried forever, with a small backoff. Jobs
+placed in a queue retry transaction errors, such as ConflictErrors, four times,
+while callbacks retry them forever. The defaults can be changed and the
+behavior of an individual job can be changed, using the RetryPolicy described
+below.
-While custom RetryPolicies can try to handle other transaction errors, they are
-generally considered to be out of zc.async's domain and control.
-
Summary of Job-Related Exceptions
.................................
-If an exception handles in your job's code, zc.async will log it as an ERROR
+If an exception occurs in your job's code, zc.async will log it as an ERROR
if a main queue job and as CRITICAL if it is a callback; and it will make the
result of the call a ``Failure`` with error information, as shown elsewhere.
Everything else is your responsibility, to be handled with try:except or
@@ -221,28 +224,64 @@
--------------
The rest of the document uses scenarios to illustrate how zc.async handles
-errors, and how you might want to configure retry policies. Retry policies are
-given a job, and then exception information, and then can determine whether
-zc.async should retry the job. zc.async comes with three retry policies.
+errors, and how you might want to configure retry policies.
-- One is the absence of a retry policy: do not retry this job. A value of
- ``None`` represents this policy. This is appropriate for tasks that are
- not transactional. They typically need to be handled with a callback.
+What is a retry policy? It is used in three circumstances.
-- The default is a retry policy that retries after a restart up to four times,
- retries a ConflictError up to four times, and retries a ClientDisconnected up
- to four times.
+- When the job starts but fails to complete because the system is interrupted,
+ the job will try to call ``retry_policy.interrupted()`` to get a boolean as
+ to whether the job should be retried.
-- One is a retry policy that never gives up on restarts, ConflictErrors, or
- ClientDisconnected errors: it has no limit but keeps trying forever.
+- When the code the job ran fails, the job will try to call
+ ``retry_policy.jobError(failure, data_cache)`` to get a boolean as to whether
+ the job should be retried.
+- When the commit fails, the job will try to call
+ ``retry_policy.commitError(failure, data_cache)`` to get a boolean as to
+ whether the job should be retried.
+
+Why does this need to be a policy? Can't it be a simpler arrangement?
+
+The heart of the problem is that different jobs need different error
+resolutions.
+
+In some cases, jobs may not be fully transactional. For instance, the job
+may be communicating with an external system, such as a credit card system.
+The retry policy here should typically be "never": perhaps a callback should be
+in charge of determining what to do next.
+
+If a job is fully transactional, it can be retried. But even then the desired
+behavior may differ.
+
+- In typical cases, some errors should simply cause a failure, while other
+ errors, such as database conflict errors, should cause a limited number of
+ retries.
+
+- In some jobs, conflict errors should be retried forever, because the job must
+ be run to completion or else the system should fall over. Callbacks that try
+ to handle errors themselves may take this approach, for instance.
+
+zc.async currently ships with three retry policies.
+
+1. The default, appropriate for most fully transactional jobs, is the
+ zc.async.job.RetryCommonFourTimes.
+
+2. The other available (pre-written) option for transactional jobs is
+ zc.async.job.RetryCommonForever. Callbacks will get this policy by
+ default.
+
+Both of these policies retry ZEO disconnects forever; and interrupts and
+transaction errors such as conflicts either four times (for a total of five
+attempts) or forever, respectively.
+
+3. The last retry policy is zc.async.job.NeverRetry. This is appropriate for
+ non-transactional jobs. You'll still typically need to handle errors in
+ your callbacks.
+
Scenarios
---------
-We'll examine a number of scenarios, many of which combine problems in polling
-and in jobs. In these scenarios, we'll refer to a job that was placed directly
-in a queue as a "queue job". A job that was performed in any other way is a
-"callback job," because most often these are callbacks.
+We'll examine polling error scenarios and job error scenarios.
- Polling errors
@@ -250,79 +289,22 @@
* The system is polling and gets a ClientDisconnected error.
-- Internal job errors
+- Job errors
- * A worker process is polling and working on a queue job. The queue job fails
- internally.
-
- * A worker process is polling and working on a callback job. The callback job
- fails internally.
-
-- Default retry policy
-
- * A worker process is working on a job with the default retry policy and gets
- a ConflictError during the commit.
-
- * A worker process is working on a job with the default retry policy and gets
- a ClientDisconnected error.
-
* A worker process is working on a job with the default retry policy. The
process dies gracefully and restarts.
-
+
* Like the previous scenario, a worker process is working on a job with the
default retry policy. The process crashes hard (does not die gracefully)
and restarts.
-
+
* Like the previous scenario, a worker process is working on a job with the
default retry policy. The process crashes hard (does not die gracefully)
and a sibling notices and takes over.
-- No retry policy
+ * A worker process is working on a job with the default retry policy and gets
+ an error during the job or the commit.
- * A worker process is working on a job without a retry policy (that is,
- zc.async should not retry it) and gets a ConflictError during the commit.
-
- * A worker process is working on a job without a retry policy (that is,
- zc.async should not retry it) and gets a ClientDisconnected error.
-
- * A worker process is working on a job with no retry policy (that is,
- zc.async should not retry it). The process dies gracefully and restarts.
-
- * Like the previous scenario, a worker process is working on a job with no
- retry policy (that is, zc.async should not retry it). The process crashes
- hard (does not die gracefully) and restarts.
-
- * Like the previous scenario, a worker process is working on a job with no
- retry policy (that is, zc.async should not retry it). The process crashes
- hard (does not die gracefully) and a sibling notices and takes over.
-
-- Retry forever policy
-
- * A worker process is working on a job with the retry-forever policy and gets
- a ConflictError during the commit.
-
- * A worker process is working on a job with the retry-forever policy and gets
- a ClientDisconnected error.
-
- * A worker process is working on a job with the retry-forever policy. The
- process dies gracefully and restarts.
-
- * Like the previous scenario, a worker process is working on a job with the
- retry-forever policy. The process crashes hard (does not die gracefully)
- and restarts.
-
- * Like the previous scenario, a worker process is working on a job with the
- retry-forever policy. The process crashes hard (does not die gracefully)
- and a sibling notices and takes over.
-
-We will close with customizations:
-
-- custom retry policies, particularly for non-transactional tasks;
-
-- changing the default retry policy, per-process and per-agent; and
-
-- changing the default log level for queue jobs, callback jobs, and per-job.
-
-------------------------
Scenarios: Polling Errors
-------------------------
@@ -343,7 +325,8 @@
>>> lock1.acquire()
True
>>> lock2.acquire()
- >>> def acquireLockAndchooseFirst(agent):
+ True
+ >>> def acquireLockAndChooseFirst(agent):
... res = agent.queue.claim()
... if res is not None:
... lock2.release()
@@ -352,13 +335,23 @@
...
>>> import zc.async.instanceuuid
>>> import zc.async.interfaces
+ >>> import zc.async.testing
+ >>> import zc.async.dispatcher
+ >>> import pprint
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> pprint.pprint(zc.async.testing.get_poll(dispatcher, 0))
+ {'': {'main': {'active jobs': [],
+ 'error': None,
+ 'len': 0,
+ 'new jobs': [],
+ 'size': 3}}}
>>> import transaction
>>> _ = transaction.begin()
>>> queues = root[zc.async.interfaces.KEY]
>>> queue = queues['']
>>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
>>> agent = da['main']
- >>> agent.chooser = acquireLockAndchooseFirst
+ >>> agent.chooser = acquireLockAndChooseFirst
>>> def returnSomething():
... return 42
...
@@ -371,6 +364,7 @@
same job.
>>> lock2.acquire()
+ True
>>> _ = transaction.begin()
>>> job is queue.pull()
True
@@ -386,7 +380,12 @@
>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> import zc.async.testing
- >>> zc.async.testing.get_poll(dispatcher)
+ >>> pprint.pprint(zc.async.testing.get_poll(dispatcher))
+ {'': {'main': {'active jobs': [],
+ 'error': None,
+ 'len': 0,
+ 'new jobs': [],
+ 'size': 3}}}
And if we put the job back, it will be performed.
@@ -408,29 +407,49 @@
>>> lock2.locked()
True
+ >>> agent.chooser = acquireLockAndChooseFirst
>>> job = queue.put(returnSomething)
>>> transaction.commit()
>>> lock2.acquire()
+ True
>>> import ZEO.Exceptions
>>> def commit(self):
... raise ZEO.Exceptions.ClientDisconnected()
...
>>> import transaction
- >>> old_commit = transaction.TranasctionManager.commit
- >>> transaction.TranasctionManager.commit = commit
+ >>> old_commit = transaction.TransactionManager.commit
+ >>> transaction.TransactionManager.commit = commit
+ >>> import time
+ >>> sleep_requests = []
+ >>> def sleep(i):
+ ... sleep_requests.append(i)
+ ...
+ >>> old_sleep = time.sleep
+ >>> time.sleep = sleep
+ >>> agent.chooser = zc.async.agent.chooseFirst
+ >>> transaction.commit()
>>> lock1.release()
- >>> zc.async.testing.get_poll(dispatcher)
- >>> transaction.TranasctionManager.commit = old_commit
+ >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
+ {'': {'main': {'active jobs': [],
+ 'error': None,
+ 'len': 0,
+ 'new jobs': [(..., 'unnamed')],
+ 'size': 3}}}
+ >>> transaction.TransactionManager.commit = old_commit
>>> zc.async.testing.wait_for_result(job)
42
+ >>> bool(sleep_requests)
+ True
Here's another variant that mimics being unable to read the storage during a
-poll, and then recouperating.
+poll, and then recuperating.
- >>> error_raised = 0
+ >>> error_raised = False
>>> def raiseDisconnectedThenChooseFirst(agent):
+ ... global error_raised
... if not error_raised:
+ ... error_raised = True
... raise ZEO.Exceptions.ClientDisconnected()
... return agent.queue.claim()
>>> agent.chooser = raiseDisconnectedThenChooseFirst
@@ -439,99 +458,395 @@
...
>>> job = queue.put(returnSomething)
>>> transaction.commit()
- >>> zc.async.testing.get_poll(dispatcher)
+ >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
+ {'': {'main': {'active jobs': [],
+ 'error': <zc.twist.Failure ...ClientDisconnected>,
+ 'len': 0,
+ 'new jobs': [],
+ 'size': 3}}}
>>> zc.async.testing.wait_for_result(job)
42
-------------------------------
-Scenarios: Internal Job Errors
-------------------------------
+-----------------------------
+Scenarios: Job-Related Errors
+-----------------------------
-Queue Job
----------
+Graceful Shutdown During Job
+----------------------------
-Callback Job
-------------
+First let's consider how a failed job with a callback or two is handled when
+the dispatcher dies.
--------------------------------
-Scenarios: Default Retry Policy
--------------------------------
+Here we start a job.
-ConflictError
--------------
+ >>> import zope.component
+ >>> import transaction
+ >>> import zc.async.interfaces
+ >>> import zc.async.testing
+ >>> import zc.async.dispatcher
-ClientDisconnected
-------------------
+ >>> queue = root[zc.async.interfaces.KEY]['']
+ >>> lock = threading.Lock()
+ >>> lock.acquire()
+ True
+ >>> fail_flag = True
+ >>> def wait_for_me():
+ ... global fail_flag
+ ... if fail_flag:
+ ... fail_flag = False
+ ... lock.acquire()
+ ... lock.release() # so we can use the same lock again later
+ ... raise SystemExit() # this will cause the worker thread to exit
+ ... else:
+ ... return 42
+ ...
+ >>> def handle_result(result):
+ ... return 'I got result %r' % (result,)
+ ...
+ >>> job = queue.put(wait_for_me)
+ >>> callback_job = job.addCallback(handle_result)
+ >>> transaction.commit()
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> poll = zc.async.testing.get_poll(dispatcher)
+ >>> wait_for_start(job)
-Graceful Shutdown
------------------
+In this scenario, ``wait_for_me`` is a job that, the first time it is run, will
+"unexpectedly" be lost while the dispatcher stops working. ``handle_result``
+will simply show us that callbacks will be called successfully.
-Hard Crash
-----------
+The job has started. Now, the dispatcher suddenly dies without the thread
+performing ``wait_for_me`` getting a chance to finish. For our first example,
+let's give the dispatcher a graceful exit. The dispatcher gets a chance to
+clean up its dispatcher agents, and job.handleInterrupt() goes into the queue.
-Hard Crash and Sibling
-----------------------
+ >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+ >>> wait_to_deactivate(dispatcher)
+ >>> _ = transaction.begin()
+ >>> job.status == zc.async.interfaces.ACTIVE
+ True
+ >>> len(queue)
+ 1
+ >>> interrupt_job = queue[0]
+ >>> interrupt_job # doctest: +ELLIPSIS
+ <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+ >>> queue[0].callable # doctest: +ELLIPSIS
+ <bound method Job.handleInterrupt of <...Job ... ``...wait_for_me()``>>
---------------------------
-Scenarios: No Retry Policy
---------------------------
+Now when the process starts back up again, ``handleInterrupt`` checks with the
+default retry policy as to what should be done. It requests that the job be
+retried. It's put back in the queue, and it is called again normally.
-ConflictError
--------------
+ >>> old_dispatcher = dispatcher
+ >>> zc.async.dispatcher.clear()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> zc.async.testing.wait_for_result(interrupt_job)
-ClientDisconnected
-------------------
+Now we need to wait for the job.
-Graceful Shutdown
------------------
+ >>> zc.async.testing.wait_for_result(job)
+ 42
+ >>> callback_job.status == zc.async.interfaces.COMPLETED
+ True
+ >>> callback_job.result
+ 'I got result 42'
-Hard Crash
-----------
+The job now has a retry policy with some currently non-interface values that
+are still worth showing here.
-Hard Crash and Sibling
-----------------------
+ >>> policy = job.getRetryPolicy()
+ >>> policy.data.get('interruptions')
+ 1
--------------------------------
-Scenarios: Retry-Forever Policy
--------------------------------
+This shows that the policy registered one interruption. [#cleanup1]_
-ConflictError
--------------
+Hard Crash During Job
+---------------------
-ClientDisconnected
-------------------
+Our next catastrophe only changes one aspect to the previous one: the
+dispatcher does not stop gracefully, and does not have a chance to clean up its
+active jobs. It is a "hard" crash.
-Graceful Shutdown
------------------
+To show this, we will start a job, simulate the dispatcher dying "hard," and
+restart it so it clean up.
-Hard Crash
-----------
+So, first we start a long-running job in the dispatcher.
-Hard Crash and Sibling
-----------------------
+ >>> lock.acquire()
+ True
+ >>> fail_flag = True
+ >>> job = queue.put(wait_for_me)
+ >>> callback_job = job.addCallback(handle_result)
+ >>> transaction.commit()
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> poll = zc.async.testing.get_poll(dispatcher)
+ >>> wait_for_start(job)
---------------
-Customizations
---------------
+Now we'll "crash" the dispatcher.
-Changing the Default Retry Policy
----------------------------------
+ >>> dispatcher.activated = False # this will make polling stop, without
+ ... # cleanup
+ >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
+ >>> dispatcher.thread.join(3)
-Creating a New Retry Policy
----------------------------
+Hard crashes can be detected because the dispatchers write datetimes to the
+database every few polls. A given dispatcher instance does this for each queue
+on a ``DispatcherAgents`` object available in ``queue.dispatchers[UUID]``,
+where ``UUID`` is the uuid of that dispatcher.
-Changing the Log Level
-----------------------
+The ``DispatcherAgents`` object has four pertinent attributes:
+``ping_interval``, ``ping_death_interval``, ``last_ping.value``, and ``dead``.
+About every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is
+supposed to write a ``datetime`` to ``last_ping.value``. If the
+``last_ping.value`` plus the ``ping_death_interval`` (also a ``timedelta``) is
+older than now, the dispatcher is considered to be ``dead``, and old jobs
+should be cleaned up.
+The ``ping_interval`` defaults to 30 seconds, and the ``ping_death_interval``
+defaults to 60 seconds. Generally, the ``ping_death_interval`` should be at
+least two or three poll intervals (``zc.async.dispatcher.get().poll_interval``)
+greater than the ``ping_interval``.
+The ping hasn't timed out yet, so the dispatcher isn't considered dead yet.
+ >>> _ = transaction.begin()
+ >>> import zc.async.instanceuuid
+ >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+ >>> da.ping_death_interval
+ datetime.timedelta(0, 60)
+ >>> da.ping_interval
+ datetime.timedelta(0, 30)
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ False
+Therefore, the job is still sitting around in the dispatcher's pile in the
+database (the ``main`` key is for the ``main`` agent installed in this
+dispatcher in the set up for these examples).
+ >>> job in da['main']
+ True
+ >>> job.status == zc.async.interfaces.ACTIVE
+ True
+Let's start our dispatcher up again.
+ >>> old_dispatcher = dispatcher
+ >>> zc.async.dispatcher.clear()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
+ >>> dispatcher = zc.async.dispatcher.get()
+Initially, it's going to be a bit confused, because it sees that the
+DispatcherAgents object is ``activated``, and not ``dead``. It can't tell if
+there's another process using its same UUID, or if it is looking at the result
+of a hard crash.
+ >>> zc.async.testing.wait_for_result(job, seconds=1)
+ Traceback (most recent call last):
+ ...
+ AssertionError: job never completed
+ >>> zc.async.testing.get_poll(dispatcher, seconds=1)
+ {'': None}
+ >>> for r in reversed(event_logs.records):
+ ... if r.levelname == 'ERROR':
+ ... break
+ ... else:
+ ... assert False, 'did not find log'
+ ...
+ >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ UUID ... already activated in queue (oid 4): another process?
+ (To stop poll attempts in this process, set
+ ``zc.async.dispatcher.get().activated = False``. To stop polls
+ permanently, don't start a zc.async.dispatcher!)
+To speed up the realization of our dispatcher that the previous activation is
+``dead``, we'll set the ping_death_interval to just one second.
+
+ >>> _ = transaction.begin()
+ >>> da.dead
+ False
+ >>> job in da['main']
+ True
+ >>> len(queue)
+ 0
+ >>> import datetime
+ >>> da.ping_death_interval = datetime.timedelta(seconds=1)
+ >>> da.dead
+ True
+ >>> bool(da.activated)
+ True
+ >>> transaction.commit()
+
+After the next poll, the dispatcher will have cleaned up its old tasks in the
+same way we saw in the previous example. The job's ``handleInterrupt`` method
+will be called, and the job will be put back in the queue to be retried. The
+DispatcherAgents object is no longer dead, because it is tied to the new
+instance of the dispatcher.
+
+ >>> poll = zc.async.testing.get_poll(dispatcher)
+ >>> def wait_for_pending(job):
+ ... for i in range(60):
+ ... t = transaction.begin()
+ ... if job.status in (zc.async.interfaces.PENDING):
+ ... break
+ ... time_sleep(0.1)
+ ... else:
+ ... assert False, 'job never pending'
+ >>> wait_for_pending(job)
+ >>> job in da['main']
+ False
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ False
+ >>> queue[0] is job
+ True
+
+Now we need to wait for the job.
+
+ >>> zc.async.testing.wait_for_result(job)
+ 42
+ >>> callback_job.status == zc.async.interfaces.COMPLETED
+ True
+ >>> callback_job.result
+ 'I got result 42'
+ >>> policy = job.getRetryPolicy()
+ >>> policy.data.get('interruptions')
+ 1
+
+The dispatcher cleaned up its own "hard" crash.
+
+[#cleanup2]_
+
+Hard Crash During Job with Sibling Recovery
+-------------------------------------------
+
+Our next catastrophe is the same as the one before, except, after one
+dispatcher's hard crash, another dispatcher is around to clean up the dead
+jobs.
+
+To show this, we will start a job, start a second dispatcher, simulate the
+first dispatcher dying "hard," and watch the second dispatcher clean up
+after the first one.
+
+So, first we start a long-running job in the dispatcher as before.
+
+ >>> lock.acquire()
+ True
+ >>> fail_flag = True
+ >>> job = queue.put(wait_for_me)
+ >>> callback_job = job.addCallback(handle_result)
+ >>> transaction.commit()
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> poll = zc.async.testing.get_poll(dispatcher)
+ >>> wait_for_start(job)
+
+Now we'll start up an alternate dispatcher.
+
+ >>> import uuid
+ >>> alt_uuid = uuid.uuid1()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.5, uuid=alt_uuid)(
+ ... zc.async.interfaces.DatabaseOpened(db))
+ >>> alt_dispatcher = zc.async.dispatcher.get(alt_uuid)
+
+We're also going to set the main dispatcher's ``ping_death_interval`` back to
+60 seconds so we can see some polls in the alternate dispatcher before it gets
+around to cleaning up.
+
+ >>> da.ping_death_interval = datetime.timedelta(seconds=60)
+ >>> transaction.commit()
+
+Now we'll "crash" the dispatcher.
+
+ >>> dispatcher.activated = False # this will make polling stop, without
+ ... # cleanup
+ >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
+ >>> dispatcher.thread.join(3)
+
+As discussed in the previous example, the polling hasn't timed out yet, so the
+alternate dispatcher can't know that the first one is dead. Therefore, the job
+is still sitting around in the old dispatcher's pile in the database.
+
+ >>> _ = transaction.begin()
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ False
+ >>> job.status == zc.async.interfaces.ACTIVE
+ True
+ >>> alt_poll_1 = zc.async.testing.get_poll(alt_dispatcher)
+ >>> _ = transaction.begin()
+ >>> job in da['main']
+ True
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ False
+ >>> alt_poll_2 = zc.async.testing.get_poll(alt_dispatcher)
+ >>> _ = transaction.begin()
+ >>> job in da['main']
+ True
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ False
+
+Above, the ping_death_interval was returned to the default of 60 seconds. To
+speed up the realization of our second dispatcher that the first one is dead,
+we'll set the ping_death_interval back down to just one second.
+
+ >>> da.ping_death_interval
+ datetime.timedelta(0, 60)
+ >>> import datetime
+ >>> da.ping_death_interval = datetime.timedelta(seconds=1)
+ >>> da.dead
+ True
+ >>> bool(da.activated)
+ True
+ >>> transaction.commit()
+
+After the second dispatcher gets a poll--a chance to notice--it will have
+cleaned up the first dispatcher's old tasks in the same way we saw in the
+previous example. The job's ``handleInterrupt`` method will be called, which
+in this case will put it back in the queue to be claimed and performed.
+
+ >>> alt_poll_3 = zc.async.testing.get_poll(alt_dispatcher)
+ >>> _ = transaction.begin()
+ >>> job in da['main']
+ False
+ >>> bool(da.activated)
+ False
+ >>> da.dead
+ True
+ >>> wait_for_pending(job)
+ >>> queue[0] is job
+ True
+
+Now we need to wait for the job.
+
+ >>> zc.async.testing.wait_for_result(job)
+ 42
+ >>> callback_job.status == zc.async.interfaces.COMPLETED
+ True
+ >>> callback_job.result
+ 'I got result 42'
+
+The sibling, then, was able to clean up the mess left by the "hard" crash of
+the first dispatcher.
+
+[#cleanup3]_
+
+Other Job-Related Errors
+------------------------
+
+Other problems--errors when performing or committing jobs--are handled within
+jobs, getting the decisions from retry policies as described above. These
+are demonstrated in the job.txt document.
+
.. ......... ..
.. Footnotes ..
.. ......... ..
@@ -541,8 +856,8 @@
>>> import ZODB.FileStorage
>>> storage = ZODB.FileStorage.FileStorage(
... 'main.fs', create=True)
- >>> from ZODB.DB import DB
- >>> db = DB(storage)
+ >>> from ZODB.DB import DB
+ >>> db = DB(storage)
>>> conn = db.open()
>>> root = conn.root()
>>> import zc.async.configure
@@ -560,13 +875,14 @@
>>> import transaction
>>> _ = transaction.begin()
- >>> import time
+ >>> from time import sleep as time_sleep # we import in this manner so
+ ... # that our testing monkey-patch of time.sleep does not affect our tests
>>> def wait_for_start(job):
... for i in range(60):
... t = transaction.begin()
... if job.status == zc.async.interfaces.ACTIVE:
... break
- ... time.sleep(0.1)
+ ... time_sleep(0.1)
... else:
... assert False, 'job never started'
@@ -574,6 +890,40 @@
... for i in range(60):
... if dispatcher.activated == False:
... break
- ... time.sleep(0.1)
+ ... time_sleep(0.1)
... else:
- ... assert False, 'dispatcher never deactivated'
\ No newline at end of file
+ ... assert False, 'dispatcher never deactivated'
+
+.. [#cleanup1]
+
+ >>> lock.release()
+ >>> old_dispatcher.thread.join(3)
+ >>> old_dispatcher.dead_pools[0].threads[0].join(3)
+
+.. [#cleanup2]
+
+ >>> lock.release()
+ >>> old_dispatcher.thread.join(3)
+ >>> for queue_pools in old_dispatcher.queues.values():
+ ... for name, pool in queue_pools.items():
+ ... pool.setSize(0)
+ ... for thread in pool.threads:
+ ... thread.join(3)
+ ...
+ -3
+
+.. [#cleanup3]
+
+ >>> lock.release()
+ >>> dispatcher.thread.join(3)
+ >>> for queue_pools in dispatcher.queues.values():
+ ... for name, pool in queue_pools.items():
+ ... pool.setSize(0)
+ ... for thread in pool.threads:
+ ... thread.join(3)
+ ...
+ -3
+ >>> alt_dispatcher.reactor.callFromThread(alt_dispatcher.reactor.stop)
+ >>> alt_dispatcher.thread.join(3)
+ >>> alt_dispatcher.dead_pools[0].threads[0].join(3)
+ >>> time.sleep = old_sleep
Modified: zc.async/branches/dev/src/zc/async/configure.py
===================================================================
--- zc.async/branches/dev/src/zc/async/configure.py 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/configure.py 2008-06-17 02:38:06 UTC (rev 87444)
@@ -56,4 +56,4 @@
def base():
# see comment in ``minimal``, above
minimal()
- zope.component.provideAdapter(zc.twist.connection)
\ No newline at end of file
+ zope.component.provideAdapter(zc.twist.connection)
Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py 2008-06-17 02:38:06 UTC (rev 87444)
@@ -21,6 +21,7 @@
import twisted.python.failure
import twisted.internet.defer
import ZODB.POSException
+import ZEO.Exceptions
import ZODB.utils
import BTrees
import transaction
@@ -55,7 +56,7 @@
def __init__(self):
self._event = threading.Event()
-
+
def setResult(self, value):
self.result = value
self._event.set()
@@ -119,6 +120,9 @@
class AgentThreadPool(object):
_size = 0
+ initial_backoff = 5
+ incremental_backoff = 5
+ maximum_backoff = 60
def __init__(self, dispatcher, name, size):
self.dispatcher = dispatcher
@@ -134,22 +138,52 @@
local.dispatcher = self.dispatcher
conn = self.dispatcher.db.open()
try:
- job = self.queue.get()
- while job is not None:
- identifier, dbname, info = job
+ job_info = self.queue.get()
+ while job_info is not None:
+ identifier, dbname, info = job_info
info['thread'] = thread.get_ident()
info['started'] = datetime.datetime.utcnow()
zc.async.utils.tracelog.info(
'starting in thread %d: %s',
info['thread'], info['call'])
+ backoff = self.initial_backoff
+ conflict_retry_count = 0
try:
- transaction.begin()
- if dbname is None:
- local_conn = conn
- else:
- local_conn = conn.get_connection(dbname)
- job = local_conn.get(identifier)
- local.job = job
+ while 1:
+ try:
+ transaction.begin()
+ if dbname is None:
+ local_conn = conn
+ else:
+ local_conn = conn.get_connection(dbname)
+ job = local_conn.get(identifier)
+ # this setstate should trigger any initial problems
+ # within the try/except retry structure here.
+ local_conn.setstate(job)
+ local.job = job
+ except ZEO.Exceptions.ClientDisconnected:
+ zc.async.utils.log.info(
+ 'ZEO client disconnected while trying to '
+ 'get job %d in db %s; retrying in %d seconds',
+ ZODB.utils.u64(identifier), dbname or '',
+ backoff)
+ time.sleep(backoff)
+ backoff = min(self.maximum_backoff,
+ backoff + self.incremental_backoff)
+ except ZODB.POSException.TransactionError:
+ # continue, i.e., try again
+ conflict_retry_count += 1
+ if (conflict_retry_count == 1 or
+ not conflict_retry_count % 5):
+ zc.async.utils.log.warning(
+ '%d transaction error(s) while trying to '
+ 'get job %d in db %s',
+ conflict_retry_count,
+ ZODB.utils.u64(identifier), dbname or '',
+ exc_info=True)
+ # now ``while 1`` loop will continue, to retry
+ else:
+ break
try:
job() # this does the committing and retrying, largely
except zc.async.interfaces.BadStatusError:
@@ -210,14 +244,14 @@
zc.async.utils.tracelog.info(
'completed in thread %d: %s',
info['thread'], info['call'])
- job = self.queue.get()
+ job_info = self.queue.get()
finally:
conn.close()
if self.dispatcher.activated:
# this may cause some bouncing, but we don't ever want to end
# up with fewer than needed.
self.dispatcher.reactor.callFromThread(self.setSize)
-
+
def setSize(self, size=None):
# this should only be called from the thread in which the reactor runs
# (otherwise it needs locks)
Modified: zc.async/branches/dev/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.txt 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/dispatcher.txt 2008-06-17 02:38:06 UTC (rev 87444)
@@ -8,29 +8,29 @@
must have several methods:
class IReactor(zope.interface.Interface):
-
+
def callFromThread(callable, *args, **kw):
"""have callable run in reactor's thread, by reactor, ASAP.
-
+
Intended to be called from a thread other than the reactor's main
loop.
"""
-
+
def callInThread(callable, *args, **kw):
"""have callable run in a separate thread, ASAP.
-
+
Must be called in same thread as reactor's main loop.
"""
-
+
def callLater(seconds, callable, *args, **kw):
"""have callable run in reactor at least <seconds> from now
-
+
Must be called in same thread as reactor's main loop.
"""
def addSystemEventTrigger(phase, event, callable, *args, **kw):
"""Install a callable to be run in phase of event.
-
+
must support phase 'before', and event 'shutdown'.
"""
@@ -140,7 +140,7 @@
- The queue fired events to announce the dispatcher's registration and
activation. We could have registered subscribers for either or both
of these events to create agents.
-
+
Note that the dispatcher in queue.dispatchers is a persistent
representative of the actual dispatcher: they are different objects.
@@ -161,17 +161,17 @@
True
- The dispatcher made its first ping. A ping means that the dispatcher changes
- a datetime to record that it is alive.
+ a datetime to record that it is alive.
- >>> queue.dispatchers[dispatcher.UUID].last_ping is not None
+ >>> queue.dispatchers[dispatcher.UUID].last_ping.value is not None
True
- The dispatcher needs to update its last_ping after every ``ping_interval``
- seconds. If it has not updated the last_ping after ``ping_death_interval``
- then the dispatcher is considered to be dead, and active jobs in the
- dispatcher's agents are ended (and given a chance to respond to that status
- change, so they can put themselves back on the queue to be restarted if
- desired).
+ The dispatcher needs to update its ``last_ping.value`` after every
+ ``ping_interval`` seconds. If it has not updated the ``last_ping.value``
+ after ``ping_death_interval`` then the dispatcher is considered to be dead,
+ and active jobs in the dispatcher's agents are ended (and given a chance to
+ respond to that status change, so they can put themselves back on the queue
+ to be restarted if desired).
>>> queue.dispatchers[dispatcher.UUID].ping_interval
datetime.timedelta(0, 30)
@@ -180,7 +180,7 @@
- We have some log entries. (We're using some magic log handlers inserted by
setup code in tests.py here.)
-
+
>>> print event_logs # doctest: +ELLIPSIS
zc.async.events INFO
attempting to activate dispatcher ...
@@ -282,8 +282,8 @@
{'main':
{'active jobs': [], 'error': None,
'new jobs': [(..., 'unnamed')], 'len': 0, 'size': 3}}}
-
+
[#getPollInfo]_ Notice our ``new jobs`` from the poll and the log has a value
in it now. We can get some information about that job from the dispatcher.
@@ -350,8 +350,18 @@
>>> badjob.parent # doctest: +ELLIPSIS
<zc.async.agent.Agent object at 0x...>
- >>> len(badjob.parent)
- 0
+ >>> import time
+ >>> for i in range(60):
+ ... if len(badjob.parent) == 0:
+ ... print 'yay'
+ ... break
+ ... else:
+ ... time.sleep(0.1)
+ ... _ = transaction.begin()
+ ... else:
+ ... print 'oops'
+ ...
+ yay
``zc.async.local`` also allows some fun tricks. Your callable can access the
queue, perhaps to put another job in.
@@ -371,7 +381,7 @@
of the last database sync for the thread's connection (at transaction
boundaries). This function is ``zc.async.local.getJob()``, and is seen below.
-It can also get and set job annotations *live, in another connection*.
+It can also get and set job annotations *live, in another connection*.
This allows you to send messages about job progress, or get live
information about whether you should change or stop your work, for
instance.
@@ -452,8 +462,8 @@
>>> import ZODB.FileStorage
>>> storage = ZODB.FileStorage.FileStorage(
... 'main.fs', create=True)
- >>> from ZODB.DB import DB
- >>> db = DB(storage)
+ >>> from ZODB.DB import DB
+ >>> db = DB(storage)
>>> conn = db.open()
>>> root = conn.root()
>>> import zc.async.configure
@@ -461,7 +471,7 @@
.. [#getPollInfo] The dispatcher has a ``getPollInfo`` method that lets you
find this poll information also.
-
+
>>> dispatcher.getPollInfo(at=poll.key) is poll
True
>>> dispatcher.getPollInfo(at=poll.utc_timestamp) is poll
@@ -480,7 +490,7 @@
.. [#show_error] OK, so you want to see a verbose traceback? OK, you asked
for it. We're eliding more than 90% of this, and this is a small one,
believe it or not. Rotate your logs!
-
+
Notice that all of the values in the logs are reprs.
>>> bad_job = queue.put(
@@ -489,8 +499,8 @@
>>> wait_for_result(bad_job)
<zc.twist.Failure exceptions.TypeError>
-
- >>> for r in reversed(trace_logs.records):
+
+ >>> for r in reversed(event_logs.records):
... if r.levelname == 'ERROR':
... break
... else:
@@ -500,12 +510,9 @@
<zc.async.job.Job (oid ..., db 'unnamed')
``<built-in function mul>(14, None)``> failed with traceback:
*--- Failure #... (pickled) ---
- .../zc/async/job.py:...: _call_with_retry(...)
+ .../zc/async/job.py:...: __call__(...)
[ Locals ]...
( Globals )...
- .../zc/async/job.py:...: <lambda>(...)
- [ Locals ]...
- ( Globals )...
exceptions.TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'
*--- End of Failure #... ---
<BLANKLINE>
Modified: zc.async/branches/dev/src/zc/async/interfaces.py
===================================================================
--- zc.async/branches/dev/src/zc/async/interfaces.py 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/interfaces.py 2008-06-17 02:38:06 UTC (rev 87444)
@@ -28,12 +28,12 @@
except ImportError:
class IDatabaseOpenedEvent(zope.interface.Interface):
"""The main database has been opened."""
-
+
database = zope.interface.Attribute("The main database.")
-
+
class DatabaseOpened(object):
zope.interface.implements(IDatabaseOpenedEvent)
-
+
def __init__(self, database):
self.database = database
@@ -51,31 +51,31 @@
class IReactor(zope.interface.Interface):
"""This describes what the dispatcher expects of the reactor.
-
+
The reactor does not need to actually provide this interface."""
-
+
def callFromThread(callable, *args, **kw):
"""have callable run in reactor's thread, by reactor, ASAP.
-
+
Intended to be called from a thread other than the reactor's main
loop.
"""
-
+
def callInThread(callable, *args, **kw):
"""have callable run in a separate thread, ASAP.
-
+
Must be called in same thread as reactor's main loop.
"""
-
+
def callLater(seconds, callable, *args, **kw):
"""have callable run in reactor at least <seconds> from now
-
+
Must be called in same thread as reactor's main loop.
"""
def addSystemEventTrigger(phase, event, callable, *args, **kw):
"""Install a callable to be run in phase of event.
-
+
must support phase 'before', and event 'shutdown'.
"""
@@ -83,10 +83,35 @@
"""run callable now if running, or when started.
"""
+class IRetryPolicy(zope.interface.Interface):
+ def jobError(failure, data_cache):
+ """whether and how to retry after an error while performing job.
+ return boolean as to whether to retry, or a datetime or timedelta to
+ reschedule the job in the queue. An empty timedelta means to rescedule
+ for immediately, before any pending calls in the queue."""
+
+ def commitError(failure, data_cache):
+ """whether to retry after trying to commit a job's successful result.
+
+ return boolean as to whether to retry, or a datetime or timedelta to
+ reschedule the job in the queue. An empty timedelta means to rescedule
+ for immediately, before any pending calls in the queue."""
+
+ def interrupted():
+ """whether to retry after a dispatcher dies when job was in progress.
+
+ return boolean as to whether to retry, or a datetime or timedelta to
+ reschedule the job in the queue. An empty timedelta means to rescedule
+ for immediately, before any pending calls in the queue."""
+
+ def updateData(data_cache):
+ """right before committing a job, retry is given a chance to stash
+ information it has saved in the data_cache."""
+
class IObjectEvent(zope.interface.Interface):
"""Event happened to object"""
-
+
object = zope.interface.Attribute('the object')
class AbstractObjectEvent(object):
@@ -136,9 +161,13 @@
class AbortedError(Exception):
"""An explicit abort, as generated by the default behavior of
- IJob.fail"""
+ IJob.handleInterrupt"""
+class TimeoutError(Exception):
+ """A time out caused by a ``begin_by`` value."""
+
+
class BadStatusError(Exception):
"""The job is not in the status it should be for the call being made.
This is almost certainly a programmer error."""
@@ -168,7 +197,7 @@
"""One of constants defined in zc.async.interfaces:
NEW, PENDING, ASSIGNED, ACTIVE, CALLBACKS, COMPLETED.
- NEW means not added to a queue and not yet called.
+ NEW means not added to a queue and not yet called.
PENDING means addded to a queue but not an agent, and not yet called.
ASSIGNED means added to an agent and not yet called.
ACTIVE means in the process of being called.
@@ -207,10 +236,8 @@
self.args for the call, and any kwargs effectively update self.kwargs
for the call."""
- def fail(e=AbortedError):
- """Fail this job, with option error e. May only be called when
- job is in PENDING or ACTIVE states, or else raises BadStatusError.
- If e is not provided,"""
+ def handleInterrupt():
+ """use IRetryPolicy to decide whether to abort."""
def resumeCallbacks():
"""Make all callbacks remaining for this job. Any callbacks
@@ -229,7 +256,7 @@
# """a set of selected worker UUIDs. If it is empty, it is
# interpreted as the set of all available workerUUIDs. Only
# workers with UUIDs in the set may perform it.
-#
+#
# If a worker would have selected this job for a run, but the
# difference of selected_workerUUIDs and excluded_workerUUIDs
# stopped it, it is responsible for verifying that the effective
@@ -250,26 +277,26 @@
class IAgent(zope.interface.common.sequence.IFiniteSequence):
"""Responsible for picking jobs and keeping track of them.
-
+
An agent is a persistent object in a queue that is associated with a
dispatcher and is responsible for picking jobs and keeping track of
them. Zero or more agents within a queue can be associated with a
- dispatcher.
-
+ dispatcher.
+
Each agent for a given dispatcher is identified uniquely with a
name. A fully (universally) unique identifier for the agent can be
obtained by combining the key of the agent's queue in the main queue
mapping at the ZODB root; the UUID of the agent's dispatcher; and
the agent's name.
"""
-
+
size = zope.interface.Attribute(
"""The maximum number of jobs this agent should have active at a time.
""")
name = zope.interface.Attribute(
"""The name for this agent. Unique within its dispatcher's jobs for
- its queue. Can be used to obtain agent with
+ its queue. Can be used to obtain agent with
queue.dispatchers[*dispatcher UUID*][*name*].""")
completed = zope.interface.Attribute(
@@ -305,9 +332,9 @@
Rememeber that IJobs are not guaranteed to be run in order
added to a queue. If you need sequencing, use
IJob.addCallbacks.
-
+
item must be an IJob, or be adaptable to that interface.
- begin_after must be None (to leave the job's current value) or a
+ begin_after must be None (to leave the job's current value) or a
datetime.datetime. begin_by must be None (to leave it alone) or a
datetime.timedelta of the duration after the begin_after.
@@ -326,11 +353,14 @@
queue on the `assignerUUID`.
"""
+ def putBack(item):
+ """Return a previously claimed job to the top of the queue."""
+
def pull(index=0):
"""Remove and return a job, by default from the front of the queue.
Raise IndexError if index does not exist.
-
+
This is the blessed way to remove an unclaimed job from the queue so
that dispatchers will not try to perform it.
"""
@@ -356,7 +386,7 @@
def ping(UUID):
"""responsible for setting ping time if necessary for this
dispatcher agent, and for decomissioning dead dispatchers for
- the next highest dispatcher (sorted by UUID) if its (last_ping +
+ the next highest dispatcher (sorted by UUID) if its (last_ping.value +
ping_interval + ping_death_interval) < now. If this is the
highest dispatcher UUID, cycles around to lowest."""
Modified: zc.async/branches/dev/src/zc/async/job.py
===================================================================
--- zc.async/branches/dev/src/zc/async/job.py 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/job.py 2008-06-17 02:38:06 UTC (rev 87444)
@@ -77,35 +77,9 @@
elif status == zc.async.interfaces.CALLBACKS:
a.resumeCallbacks()
-class IRetries(zope.interface.Interface): # XXX move
- def jobError(failure, data_cache):
- """whether and how to retry after an error while performing job.
-
- return boolean as to whether to retry, or a datetime or timedelta to
- reschedule the job in the queue. An empty timedelta means to rescedule
- for immediately, before any pending calls in the queue."""
-
- def transactionError(failure, data_cache):
- """whether to retry after trying to commit a job's successful result.
-
- return boolean as to whether to retry, or a datetime or timedelta to
- reschedule the job in the queue. An empty timedelta means to rescedule
- for immediately, before any pending calls in the queue."""
-
- def interrupted():
- """whether to retry after a dispatcher dies when job was in progress.
-
- return boolean as to whether to retry, or a datetime or timedelta to
- reschedule the job in the queue. An empty timedelta means to rescedule
- for immediately, before any pending calls in the queue."""
-
- def updateData(data_cache):
- """right before committing a job, retry is given a chance to stash
- information it has saved in the data_cache."""
-
-class Retries(persistent.Persistent): # default for '' IRetries
+class RetryCommonFourTimes(persistent.Persistent): # default
zope.component.adapts(zc.async.interfaces.IJob)
- zope.interface.implements(zc.async.interfaces.IRetries)
+ zope.interface.implements(zc.async.interfaces.IRetryPolicy)
# exceptions, data_cache key, max retry, initial backoff seconds,
# incremental backoff seconds, max backoff seconds
@@ -116,13 +90,14 @@
5, 0, 0, 0),
)
transaction_exceptions = internal_exceptions
- max_interruptions = 10
+ max_interruptions = 9
+ log_every = 5
def __init__(self, job):
self.parent = self.__parent__ = job
self.data = BTrees.family32.OO.BTree()
- def updateData(data_cache):
+ def updateData(self, data_cache):
if 'first_active' in self.data and 'first_active' in data_cache:
data_cache.pop('first_active')
self.data.update(data_cache)
@@ -130,7 +105,7 @@
def jobError(self, failure, data_cache):
return self._process(failure, data_cache, self.internal_exceptions)
- def transactionError(self, failure, data_cache):
+ def commitError(self, failure, data_cache):
return self._process(failure, data_cache, self.transaction_exceptions)
def _process(self, failure, data_cache, exceptions):
@@ -139,7 +114,15 @@
if failure.check(*exc) is not None:
count = data_cache.get(key, 0) + 1
if max_count is not None and count >= max_count:
+ zc.async.utils.tracelog.info(
+ 'Retry policy for job %r is not retrying after %d '
+ 'counts of %s occurrences', self.parent, count, key)
return False
+ elif count==1 or not count % self.log_every:
+ zc.async.utils.tracelog.info(
+ 'Retry policy for job %r requests another attempt '
+ 'after %d counts of %s occurrences', self.parent,
+ count, key)
backoff = min(max_backoff,
(init_backoff + (count-1) * incr_backoff))
if backoff:
@@ -154,23 +137,34 @@
def interrupted(self):
if 'first_active' not in self.data:
self.data['first_active'] = self.parent.active_start
- ct = self.data['interruptions'] = self.data.get('interruptions', 0) + 1
- return self.max_interruptions is None or ct <= self.max_interruptions
+ count = self.data['interruptions'] = self.data.get('interruptions', 0) + 1
+ if self.max_interruptions is None or count <= self.max_interruptions:
+ if count==1 or not count % self.log_every:
+ zc.async.utils.tracelog.info(
+ 'Retry policy for job %r requests another attempt '
+ 'after %d interrupts', self.parent, count)
+ return True
+ else:
+ zc.async.utils.tracelog.info(
+ 'Retry policy for job %r is not retrying after %d '
+ 'interrupts', self.parent, count)
+ return False
-class RetrySystemErrorsForever(Retries): # default for 'callback' IRetries
+
+class RetryCommonForever(RetryCommonFourTimes):
# retry on ZEO failures and Transaction errors during the job forever
- # retry on transactionErrors and interrupteds forever.
+ # retry on commitErrors and interrupteds forever.
internal_exceptions = (
((ZEO.Exceptions.ClientDisconnected,), 'zeo_disconnected',
None, 5, 5, 60),
((ZODB.POSException.TransactionError,), 'transaction_error',
None, 0, 0, 0),
)
-
+
max_interruptions = None
- def transactionError(self, failure, data_cache):
- res = super(RetryForever, self).transactionError(failure, data_cache)
+ def commitError(self, failure, data_cache):
+ res = super(RetryCommonForever, self).commitError(failure, data_cache)
if not res:
# that just means we didn't record it. We actually are going to
# retry.
@@ -181,6 +175,33 @@
data_cache['first_active'] = self.parent.active_start
return True # always retry
+class NeverRetry(persistent.Persistent):
+ zope.component.adapts(zc.async.interfaces.IJob)
+ zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+
+ def __init__(self, job):
+ self.parent = self.__parent__ = job
+
+ def updateData(self, data_cache):
+ pass
+
+ def jobError(self, failure, data_cache):
+ return False
+
+ def commitError(self, failure, data_cache):
+ return False
+
+ def interrupted(self):
+ return False
+
+def callback_retry_policy_factory(job):
+ res = zope.component.queryAdapter(
+ job, zc.async.interfaces.IRetryPolicy, 'callback')
+ if res is None:
+ res = RetryCommonForever(job)
+ return res
+
+
class Job(zc.async.utils.Base):
zope.interface.implements(zc.async.interfaces.IJob)
@@ -189,11 +210,31 @@
_status = zc.async.interfaces.NEW
_begin_after = _begin_by = _active_start = _active_end = None
key = None
- # default_retry_policy and retry_policy should either be name to adapt job
- # to IRetries, or factory, or None.
- default_retry_policy = ''
- retry_policy = None
- retries = None
+ retry_policy_factory = None
+ _retry_policy = None
+ def getRetryPolicy(self):
+ if self._retry_policy is not None:
+ return self._retry_policy
+ if self.retry_policy_factory is None:
+ # first try to look up adapter with name of ''; then if that fails
+ # use RetryCommonFourTimes
+ res = zope.component.queryAdapter(
+ self, zc.async.interfaces.IRetryPolicy, '')
+ if res is None:
+ res = RetryCommonFourTimes(self)
+ elif isinstance(self.retry_policy_factory, basestring):
+ res = zope.component.getAdapter(
+ self, zc.async.interfaces.IRetryPolicy,
+ self.retry_policy_factory)
+ # this may cause an error. We can't proceed because we don't know
+ # what to do, and it may be *critical* to know. Therefore, in
+ # _getRetry, we rely on never_fail to keep on sending critical
+ # errors in the log, and never stopping.
+ else:
+ res = self.retry_policy_factory(self)
+ self._retry_policy = res
+ return res
+
default_error_log_level = logging.ERROR
error_log_level = None
@@ -203,12 +244,6 @@
return self.default_error_log_level
return self.error_log_level
- @property
- def effective_retry_policy(self):
- if self.retry_policy is None:
- return self.default_retry_policy
- return self.retry_policy
-
assignerUUID = None
_quota_names = ()
@@ -371,24 +406,28 @@
self._callable_root.parent = self
def addCallbacks(self, success=None, failure=None,
- error_log_level=None, retry_policy=None):
+ error_log_level=None, retry_policy_factory=None):
if success is not None or failure is not None:
if success is not None:
success = zc.async.interfaces.IJob(success)
success.default_error_log_level = logging.CRITICAL
if error_log_level is not None:
success.error_log_level = error_log_level
- success.default_retry_policy = 'callback'
- if retry_policy is not None:
- success.retry_policy = retry_policy
+ if retry_policy_factory is not None:
+ success.retry_policy_factory = retry_policy_factory
+ elif success.retry_policy_factory is None:
+ success.retry_policy_factory = (
+ callback_retry_policy_factory)
if failure is not None:
failure = zc.async.interfaces.IJob(failure)
failure.default_error_log_level = logging.CRITICAL
if error_log_level is not None:
failure.error_log_level = error_log_level
- failure.default_retry_policy = 'callback'
- if retry_policy is not None:
- failure.retry_policy = retry_policy
+ if retry_policy_factory is not None:
+ failure.retry_policy_factory = retry_policy_factory
+ elif failure.retry_policy_factory is None:
+ failure.retry_policy_factory = (
+ callback_retry_policy_factory)
res = Job(success_or_failure, success, failure)
if success is not None:
success.parent = res
@@ -400,18 +439,17 @@
abort_handler = zc.async.interfaces.IJob(
completeStartedJobArguments)
abort_handler.args.append(res)
- res.addCallback(abort_handler, error_log_level)
+ res.addCallback(
+ abort_handler, error_log_level, retry_policy_factory)
abort_handler.default_error_log_level = logging.CRITICAL
if error_log_level is not None:
abort_handler.error_log_level = error_log_level
- abort_handler.default_retry_policy = 'callback'
- if retry_policy is not None:
- abort_handler.retry_policy = retry_policy
else:
res = self
return res
- def addCallback(self, callback, error_log_level=None, retry_policy=None):
+ def addCallback(self, callback, error_log_level=None,
+ retry_policy_factory=None):
callback = zc.async.interfaces.IJob(callback)
self.callbacks.put(callback)
callback.parent = self
@@ -423,35 +461,29 @@
callback.default_error_log_level = logging.CRITICAL
if error_log_level is not None:
callback.error_log_level = error_log_level
- callback.default_retry_policy = 'callback'
- if retry_policy is not None:
- callback.retry_policy = retry_policy
+ if retry_policy_factory is not None:
+ callback.retry_policy_factory = retry_policy_factory
+ elif callback.retry_policy_factory is None:
+ callback.retry_policy_factory = callback_retry_policy_factory
return callback
def _getRetry(self, call_name, tm, *args):
- def getRetry():
- retries = self.retries
- if retries is None:
- retry_policy = self.effective_retry_policy
- if retry_policy is None:
- return None # means, do not retry ever
- elif isinstance(retry_policy, basestring):
- retries = zope.component.getAdapter(
- self, zc.async.interfaces.IRetries,
- name=retry_policy)
- else:
- retries = retry_policy(self)
- if retries is not None:
- self.retries = retries
- call = getattr(retries, call_name, None)
- if call is None:
- zc.async.utils.log.error(
- 'retries %r for %r does not have required %s method',
- retries, self, call_name)
- return None
- return call(*args)
- identifier = 'getting %s retry for %r' % (call_name, self)
- return zc.async.utils.never_fail(getRetry, identifier, tm)
+ # if we are after the time that we are supposed to begin_by, no retry
+ if (self.begin_by is not None and self.begin_after is not None and
+ self.begin_by + self.begin_after > datetime.datetime.now(pytz.UTC)):
+ return False
+ # we divide up the two ``never_fail`` calls so that retries in getting
+ # the policy don't affect actually calling the method.
+ identifier = 'getting retry policy for %r' % (self,)
+ policy = zc.async.utils.never_fail(self.getRetryPolicy, identifier, tm)
+ call = getattr(policy, call_name, None)
+ if call is None:
+ zc.async.utils.log.error(
+ 'retry policy %r for %r does not have required %s method',
+ policy, self, call_name)
+ return None
+ identifier = 'getting result for %s retry for %r' % (call_name, self)
+ return zc.async.utils.never_fail(lambda: call(*args), identifier, tm)
def __call__(self, *args, **kwargs):
if self.status not in (zc.async.interfaces.NEW,
@@ -459,16 +491,20 @@
raise zc.async.interfaces.BadStatusError(
'can only call a job with NEW or ASSIGNED status')
tm = transaction.interfaces.ITransactionManager(self)
- self._status = zc.async.interfaces.ACTIVE
- self._active_start = datetime.datetime.now(pytz.UTC)
- tm.commit()
- effective_args = list(args)
- effective_args[0:0] = self.args
- effective_kwargs = dict(self.kwargs)
- effective_kwargs.update(kwargs)
+ def prepare():
+ self._status = zc.async.interfaces.ACTIVE
+ self._active_start = datetime.datetime.now(pytz.UTC)
+ effective_args = list(args)
+ effective_args[0:0] = self.args
+ effective_kwargs = dict(self.kwargs)
+ effective_kwargs.update(kwargs)
+ return effective_args, effective_kwargs
+ identifier = 'preparing for call of %r' % (self,)
+ effective_args, effective_kwargs = zc.async.utils.never_fail(
+ prepare, identifier, tm)
# this is the calling code. It is complex and long because it is
# trying both to handle exceptions reasonably, and to honor the
- # IRetries interface for those exceptions.
+ # IRetryPolicy interface for those exceptions.
data_cache = {}
res = None
while 1:
@@ -484,107 +520,172 @@
if isinstance(retry, (datetime.timedelta, datetime.datetime)):
identifier = (
'rescheduling %r as requested by '
- 'associated IRetries %r' % (
- self, self.retries))
+ 'associated IRetryPolicy %r' % (
+ self, self.getRetryPolicy()))
if self is zc.async.utils.never_fail(
lambda: self._reschedule(retry, data_cache),
identifier, tm):
return self
elif retry:
continue
+ # policy didn't exist or returned False or couldn't reschedule
try:
- self._set_result(res)
+ callback = self._set_result(res, tm, data_cache)
except zc.async.utils.EXPLOSIVE_ERRORS:
tm.abort()
raise
except:
+ failure = zc.twist.Failure()
+ tm.abort()
+ retry = self._getRetry('commitError', tm, failure, data_cache)
+ if isinstance(retry, (datetime.timedelta, datetime.datetime)):
+ identifier = (
+ 'rescheduling %r as requested by '
+ 'associated IRetryPolicy %r' % (
+ self, self.getRetryPolicy()))
+ if self is zc.async.utils.never_fail(
+ lambda: self._reschedule(retry, data_cache),
+ identifier, tm):
+ return self
+ elif retry:
+ continue
+ # policy didn't exist or returned False or couldn't reschedule
if isinstance(res, twisted.python.failure.Failure):
zc.async.utils.log.log(
self.effective_error_log_level,
'Commit failed for %r (see subsequent traceback). '
- 'Prior to this, job originally failed with '
- 'traceback:\n%s',
+ 'Prior to this, job failed with traceback:\n%s',
self,
res.getTraceback(
elideFrameworkCode=True, detail='verbose'))
else:
- zc.async.utils.tracelog.info(
+ zc.async.utils.log.info(
'Commit failed for %r (see subsequent traceback). '
'Prior to this, job succeeded with result: %r',
self, res)
- res = zc.twist.Failure()
- tm.abort()
- retry = self._getRetry('jobError', tm, res, data_cache)
- if isinstance(retry, (datetime.timedelta, datetime.datetime)):
- identifier = (
- 'rescheduling %r as requested by '
- 'associated IRetries %r' % (
- self, self.retries))
- if self is zc.async.utils.never_fail(
- lambda: self._reschedule(retry, data_cache),
- identifier, tm):
- return self
- elif retry:
- continue
- # retries didn't exist or returned False
+ res = failure
def complete():
self._result = res
self._status = zc.async.interfaces.CALLBACKS
self._active_end = datetime.datetime.now(pytz.UTC)
- if self.retries is not None:
- self.retries.updateData(data_cache)
- identifier = ('storing failure at commit of %r' % (self,))
+ policy = self.getRetryPolicy()
+ if data_cache and self._retry_policy is not None:
+ self._retry_policy.updateData(data_cache)
+ identifier = 'storing failure at commit of %r' % (self,)
zc.async.utils.never_fail(complete, identifier, tm)
- self._complete(res)
+ callback = True
+ if callback:
+ self._log_completion(res)
+ identifier = 'performing callbacks of %r' % (self,)
+ zc.async.utils.never_fail(self.resumeCallbacks, identifier, tm)
return res
def handleInterrupt(self):
- # this is called either within a job (that has a never fail policy)
- # or withing _resumeCallbacks (that uses never_fail)
- if self.status is not zc.async.interfaces.ACTIVE:
+ # should be called within a job that has a RetryCommonForever policy
+ tm = transaction.interfaces.ITransactionManager(self)
+ if self.status == zc.async.interfaces.ACTIVE:
+ retry = self._getRetry('interrupted', tm)
+ if isinstance(retry, (datetime.datetime, datetime.timedelta)):
+ self._reschedule(retry, queue=self.queue)
+ elif retry:
+ self._reschedule(datetime.timedelta(), queue=self.queue)
+ else:
+ res = zc.twist.Failure(zc.async.interfaces.AbortedError())
+ if self._set_result(res, tm):
+ self.resumeCallbacks()
+ self._log_completion(res)
+ elif self.status != zc.async.interfaces.CALLBACKS:
+ # we have to allow CALLBACKS or else some retries will fall over,
+ # because handleInterrupt may fail after a commit of the aborted
+ # error
raise zc.async.interfaces.BadStatusError(
'can only call ``handleInterrupt`` on a job with ACTIVE '
- 'status')
- tm = transaction.interfaces.ITransactionManager(self)
- retry = self._getRetry('interrupted', tm)
- if retry:
- if not isinstance(retry, (datetime.timedelta, datetime.datetime)):
- retry = datetime.timedelta()
- self._reschedule(retry)
+ 'status') # um...or CALLBACKS, but that's a secret :-D
else:
- self.fail()
+ self.resumeCallbacks()
- def _reschedule(self, retry, data_cache=None):
- if not zc.async.interfaces.IAgent.providedBy(self.parent):
- zc.async.utils.log.error(
- 'error for IRetries %r on %r: '
- 'can only reschedule a job directly in an agent',
- self.retries, self)
- return None
+ def fail(self, e=None):
+ # something may have fallen over the last time this was called, so we
+ # are careful to only store the error if we're not in the CALLBACKS
+ # status.
+ callback = True
+ status = self.status
+ if status in (zc.async.interfaces.COMPLETED,
+ zc.async.interfaces.ACTIVE):
+ raise zc.async.interfaces.BadStatusError(
+ 'can only call fail on a job with NEW, PENDING, or ASSIGNED '
+ 'status') # ...or CALLBACKS, but that's because of
+ # retries, and is semantically incorrect
+ if status != zc.async.interfaces.CALLBACKS:
+ if e is None:
+ e = zc.async.interfaces.TimeoutError()
+ res = zc.twist.Failure(e)
+ callback = self._set_result(
+ res, transaction.interfaces.ITransactionManager(self))
+ self._log_completion(res)
+ if callback:
+ self.resumeCallbacks()
+
+ def _reschedule(self, when, data_cache=None, queue=None):
+ if not isinstance(when, (datetime.datetime, datetime.timedelta)):
+ raise TypeError('``when`` must be datetime or timedelta')
+ in_agent = zc.async.interfaces.IAgent.providedBy(self.parent)
+ if queue is None:
+ # this is a reschedule from jobError or commitError
+ if not in_agent:
+ zc.async.utils.log.critical(
+ 'error for IRetryPolicy %r on %r: '
+ 'can only reschedule a job directly in an agent',
+ self.getRetryPolicy(), self)
+ return None
+ queue = self.queue
+ if data_cache is not None and self._retry_policy is not None:
+ self._retry_policy.updateData(data_cache)
self._status = zc.async.interfaces.NEW
- del self._active_start
- if data_cache is not None and self.retries is not None:
- self.retries.updateData(data_cache)
- self.parent.reschedule(self, retry)
+ self._active_start = None
+ if in_agent:
+ self.parent.remove(self)
+ else:
+ self.parent = None
+ now = datetime.datetime.now(pytz.UTC)
+ if isinstance(when, datetime.datetime):
+ if when.tzinfo is None:
+ when = when.replace(tzinfo=pytz.UTC)
+ if when <= now:
+ queue.putBack(self)
+ else:
+ queue.put(self, begin_after=when)
+ elif isinstance(when, datetime.timedelta):
+ if when <= datetime.timedelta():
+ queue.putBack(self)
+ else:
+ queue.put(self, begin_after=now+when)
return self
- def _set_result(self, res):
+ def _set_result(self, res, tm, data_cache=None):
+ # returns whether to call ``resumeCallbacks``
+ callback = True
if zc.async.interfaces.IJob.providedBy(res):
res.addCallback(self._callback)
+ callback = False
elif isinstance(res, twisted.internet.defer.Deferred):
res.addBoth(zc.twist.Partial(self._callback))
- # XXX need to tell Partial to retry forever
+ # TODO need to tell Partial to retry forever: currently no
+ # spelling for this in zc.twist. This makes using deferreds
+ # less reliable: _callback *must* be called.
+ callback = False
else:
if isinstance(res, twisted.python.failure.Failure):
res = zc.twist.sanitize(res)
self._result = res
self._status = zc.async.interfaces.CALLBACKS
self._active_end = datetime.datetime.now(pytz.UTC)
- if self.retries is not None:
- self.retries.updateData(data_cache)
+ if self._retry_policy is not None and data_cache:
+ self._retry_policy.updateData(data_cache)
tm.commit()
+ return callback
- def _complete(self, res):
+ def _log_completion(self, res):
if isinstance(res, twisted.python.failure.Failure):
zc.async.utils.log.log(
self.effective_error_log_level,
@@ -596,48 +697,62 @@
zc.async.utils.tracelog.info(
'%r succeeded with result: %r',
self, res)
- self.resumeCallbacks()
def _callback(self, res):
# done within a job or partial, so we can rely on their retry bits to
# some degree. However, we commit transactions ourselves, so we have
# to be a bit careful that the result hasn't been set already.
+ callback = True
if self._status == zc.async.interfaces.ACTIVE:
- self._set_result(res)
- self._complete(res)
+ callback = self._set_result(
+ res, transaction.interfaces.ITransactionManager(self))
+ self._log_completion(res)
+ if callback:
+ self.resumeCallbacks()
- def fail(self, e=None):
- if e is None:
- e = zc.async.interfaces.AbortedError()
- if self._status not in (zc.async.interfaces.NEW,
- zc.async.interfaces.ACTIVE):
- raise zc.async.interfaces.BadStatusError(
- 'can only call fail on a job with NEW, PENDING, ASSIGNED, or '
- 'ACTIVE status')
- self._complete(zc.twist.Failure(e))
-
def resumeCallbacks(self):
+ # should be called within a job that has a RetryCommonForever policy
if self._status != zc.async.interfaces.CALLBACKS:
raise zc.async.interfaces.BadStatusError(
'can only resumeCallbacks on a job with CALLBACKS status')
- identifier = 'performing callbacks for %r' % (self,)
- tm = transaction.interfaces.ITransactionManager(self)
- zc.async.utils.never_fail(self._resumeCallbacks, identifier, tm)
-
- def _resumeCallbacks(self):
callbacks = list(self.callbacks)
tm = transaction.interfaces.ITransactionManager(self)
length = 0
while 1:
for j in callbacks:
+ # TODO yuck: this mucks in callbacks' protected bits
if j._status == zc.async.interfaces.NEW:
- zc.async.utils.tracelog.debug(
- 'starting callback %r to %r', j, self)
- j(self.result)
+ if (j.begin_by is not None and
+ (j.begin_after + j.begin_by) <
+ datetime.datetime.now(pytz.UTC)):
+ zc.async.utils.log.error(
+ 'failing expired callback %r to %r', j, self)
+ j.fail()
+ else:
+ zc.async.utils.tracelog.debug(
+ 'starting callback %r to %r', j, self)
+ j(self.result)
elif j._status == zc.async.interfaces.ACTIVE:
- zc.async.utils.tracelog.debug(
- 'failing aborted callback %r to %r', j, self)
- j.handleInterrupt()
+ retry = j._getRetry('interrupted', tm)
+ istime = isinstance(
+ retry, (datetime.timedelta, datetime.datetime))
+ if istime:
+ zc.async.utils.log.error(
+ 'error for IRetryPolicy %r on %r: '
+ 'cannot reschedule a callback, only retry',
+ j.getRetryPolicy(), j)
+ if retry or istime:
+ zc.async.utils.tracelog.debug(
+ 'retrying interrupted callback '
+ '%r to %r', j, self)
+ j._status = zc.async.interfaces.NEW
+ j._active_start = None
+ j(self.result)
+ else:
+ zc.async.utils.tracelog.debug(
+ 'aborting interrupted callback '
+ '%r to %r', j, self)
+ j.fail(zc.async.interfaces.AbortedError())
elif j._status == zc.async.interfaces.CALLBACKS:
j.resumeCallbacks()
# TODO: this shouldn't raise anything we want to catch, right?
@@ -654,4 +769,4 @@
if zc.async.interfaces.IAgent.providedBy(self.parent):
self.parent.jobCompleted(self)
tm.commit()
-
+ return
Modified: zc.async/branches/dev/src/zc/async/job.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/job.txt 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/job.txt 2008-06-17 02:38:06 UTC (rev 87444)
@@ -2,34 +2,34 @@
Jobs
====
-What if you want to persist a reference to the method of a persistent
-object--you can't persist that normally in the ZODB, but that can be
-very useful, especially to store asynchronous calls. What if you want
-to act on the result of an asynchronous call that may be called later?
-The zc.async package offers an approach that combines ideas of a partial
-and that of Twisted deferred code: ``zc.async.job.Job``.
+What if you want to persist a reference to the method of a persistent object?
+You can't persist that normally in the ZODB, but that can be very useful,
+especially to store asynchronous calls. What if you want to act on the result
+of an asynchronous call that may be called later?
-To use it, simply wrap the callable--a method of a persistent object or
-a callable persistent object or a global function--in the job. You can
-include ordered and keyword arguments to the job, which may be
-persistent objects or simply pickleable objects.
+The zc.async package offers an approach that combines ideas of a partial and
+that of a Twisted deferred: ``zc.async.job.Job``. It has code that is specific
+to zc.async, so it is not truly a general-purpose persistent partial, but this
+file shows the Job largely stand-alone.
-Unlike a partial but like a Twisted deferred, the result of the wrapped
-call goes on the job's ``result`` attribute, and the immediate return of
-the call might not be the job's end result. It could also be a failure,
-indicating an exception; or another partial, indicating that we are
-waiting to be called back by the second partial; or a twisted deferred,
-indicating that we are waiting to be called back by a twisted Deferred
-(see the ``zc.twist``). After you have the partial, you can then use a
-number of methods and attributes on the partial for further set up.
-Let's show the most basic use first, though.
+To use a job, simply wrap the callable--a method of a persistent object or a
+callable persistent object or a global function--in the job. You can include
+ordered and keyword arguments to the job, which may be persistent objects or
+simply pickleable objects.
-Note that, even though this looks like an interactive prompt, all
-functions and classes defined in this document act as if they were
-defined within a module. Classes and functions defined in an interactive
-prompt are normally not picklable, and Jobs must work with
-picklable objects [#set_up]_.
+Unlike a partial but like a Twisted deferred, the result of the wrapped call
+goes on the job's ``result`` attribute. It could also be a failure, indicating
+an exception; or temporarily, None, indicating that we are waiting to be called
+back by a second Job or a twisted Deferred (see the ``zc.twist`` package).
+After you have the job, you can then use a number of methods and attributes
+on the partial for further set up. Let's show the most basic use first, though.
+
+(Note that, even though this looks like an interactive prompt, all functions and
+classes defined in this document act as if they were defined within a module.
+Classes and functions defined in an interactive prompt are normally not
+picklable, and Jobs must work with picklable objects. [#set_up]_.)
+
>>> import zc.async.job
>>> def call():
... print 'hello world'
@@ -52,8 +52,8 @@
>>> j.status == zc.async.interfaces.NEW
True
-We can call the job from the NEW (or ASSIGNED, see later) status, and
-then see that the function was called, and see the result on the partial.
+We can call the job from the NEW (or PENDING or ASSIGNED, see later) status,
+and then see that the function was called, and see the result on the partial.
>>> res = j()
hello world
@@ -69,8 +69,8 @@
'my result'
In addition to using a global function, we can also use a method of a
-persistent object. Imagine we have a ZODB root that we can put objects
-in to.
+persistent object. (For this example, imagine we have a ZODB root into which we
+can put objects.)
>>> import persistent
>>> class Demo(persistent.Persistent):
@@ -117,13 +117,13 @@
...
exceptions.RuntimeError: Bad Things Happened Here
-Note that all calls can return a failure explicitly, rather than raising
-an exception that the job converts to an exception. However, there
-is an important difference in behavior. If a wrapped call raises an
-exception, the job aborts the transaction; but if the wrapped call
-returns a failure, no abort occurs. Wrapped calls that explicitly return
-failures are thus responsible for any necessary transaction aborts. See
-the footnote for an example [#explicit_failure_example]_.
+Note that all calls can return a failure explicitly, rather than raising an
+exception that the job converts to an exception. However, there is an important
+difference in behavior. If a wrapped call raises an exception, the job aborts
+the transaction; but if the wrapped call returns a failure, no automatic abort
+occurs. Wrapped calls that explicitly return failures are thus responsible for
+any necessary transaction aborts. See the footnote for an example
+[#explicit_failure_example]_.
Now let's return a job from the job. This generally represents a result
that is waiting on another asynchronous persistent call, which would
@@ -236,23 +236,24 @@
topic.
Callbacks
----------
+=========
-The job object can also be used to handle return values and
-exceptions from the call. The ``addCallbacks`` method enables the
-functionality. Its signature is (success=None, failure=None). It may
-be called multiple times, each time adding a success and/or failure
-callable that takes an end result: a value or a zc.async.Failure object,
-respectively. Failure objects are passed to failure callables, and
-any other results are passed to success callables.
+The job object can also be used to handle return values and exceptions from the
+call. The ``addCallbacks`` method enables the functionality. Its signature is
+(success=None, failure=None). It may be called multiple times, each time adding
+a success and/or failure callable that takes the end result of the original,
+parent job: a value or a zc.async.Failure object, respectively. Failure objects
+are passed to failure callables, and any other results are passed to success
+callables.
-The return value of the success and failure callables is
-important for chains and for determining whether a job had any
-errors that need logging, as we'll see below. The call to
-``addCallbacks`` returns a job, which can be used for chaining (see
-``Chaining Callbacks``_).
+Note that, unlike with Twisted deferred's, the results of callbacks for a given
+job are not chained. To chain, add a callback to the desired callback. The
+primary reason for a Twisted callback is try:except:else:finally logic. In
+most cases, because zc.async jobs are long-running, the try:except logic can be
+accomplished within the code for the job itself. For certain kinds of
+exceptions, an IRetryPolicy is used instead.
-Let's look at a simple example.
+Let's look at a simple example of a callback.
>>> def call(*args):
... res = 1
@@ -333,6 +334,7 @@
failure. [Failure instance: Traceback: exceptions.TypeError...]
also a failure. [Failure instance: Traceback: exceptions.TypeError...]
+------------------
Chaining Callbacks
------------------
@@ -384,6 +386,7 @@
>>> isinstance(j.result, twisted.python.failure.Failure)
True
+--------------------------
Callbacks on Completed Job
--------------------------
@@ -403,6 +406,7 @@
>>> j.status == zc.async.interfaces.COMPLETED
True
+-------------
Chaining Jobs
-------------
@@ -422,33 +426,68 @@
success! 15
also a success! 60
-Failing
--------
+Failures
+========
-Speaking again of failures, it's worth discussing two other aspects of
-failing. One is that jobs offer an explicit way to fail a call. It
-can be called when the job has a NEW, PENDING, ASSIGNED or ACTIVE status.
-The primary use cases for this method are to cancel a job that is
-overdue to start, and to cancel a job that was in progress by a
-worker thread in a dispatcher when the dispatcher died (more on that below).
+Let's talk more about failures. We'll divide them up into a two large types.
+- The code you ran in the job failed.
+
+- The system around your code (zc.async code, the ZODB, or even the machine on
+ which the job is running) failed.
+
+If your code fails, it's mostly up to you to deal with it. There are two
+mechanisms to use: callbacks, which we've already seen, and retry policies,
+which we'll look at below.
+
+If the system fails around your code, four sorts of things might happen. In the
+context of the full zc.async system, it is the responsibility of the zc.async
+code to react as described below, and, when pertinent, your reponsibility to
+make sure that the retry policy for the job does what you need.
+
+- The job never started, and timed out. zc.async should call ``fail`` on the
+ job, which aborts the job and begins callbacks. (This is handled in the
+ queue's ``claim`` method, in the full context of zc.async.)
+
+- The job started but was unable to commit. Internally to the job's machinery,
+ the job uses a retry policy to decide what to do, as described below.
+
+- The job started but then was interrupted. zc.async should call
+ ``handleInterrupt``, in which a retry policy decides what to do, as described
+ below. (This is handled in a DispatcherAgents collection's ``deactivate``
+ method, in the full context of zc.async.)
+
+- The job completed, but the callbacks were interrupted. zc.async should call
+ ``resumeCallbacks``, which will handle the remaining callbacks in the manner
+ described here. (This is handled in a DispatcherAgents collection's
+ ``deactivate`` method, in the full context of zc.async.)
+
+We need to explore a few elements of this, then: the ``fail`` method, retry
+policies, job errors, commit errors, and the ``handleInterrupt`` method.
+
+--------
+``fail``
+--------
+
+The ``fail`` method is an explicit way to fail a job. It can be called when the
+job has a NEW, PENDING, or ASSIGNED status. The use case for this method is to
+cancel a job that is overdue to start. It defaults to a TimeoutError.
+
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
>>> transaction.commit()
>>> j.fail()
>>> print j.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- Traceback (most recent call last):
- ...
- zc.async.interfaces.AbortedError:
+ Traceback...zc.async.interfaces.TimeoutError...
-``fail`` calls all failure callbacks with the failure.
+``fail`` calls all callbacks with the failure.
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
>>> j_callback = j.addCallbacks(failure=failure)
>>> transaction.commit()
>>> res = j.fail() # doctest: +ELLIPSIS
- failure. [Failure instance: Traceback...zc.async.interfaces.AbortedError...]
+ failure. [Failure instance: Traceback...zc.async.interfaces.TimeoutError...]
-As seen above, it fails with zc.async.interfaces.AbortedError by default.
+As seen above, it fails with zc.async.interfaces.TimeoutError by default.
You can also pass in a different error.
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
@@ -459,43 +498,856 @@
...
exceptions.RuntimeError: failed
-As mentioned, if a dispatcher dies when working on an active task, the
-active task should be aborted using ``fail``, so the method also works if
-a job has the ACTIVE status. We'll reach under the covers to show this.
+It won't work for failing tasks in ACTIVE or COMPLETED status.
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
>>> j._status = zc.async.interfaces.ACTIVE
>>> transaction.commit()
>>> j.fail()
- >>> print j.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
- zc.async.interfaces.AbortedError:
+ BadStatusError: can only call fail on a job with NEW, PENDING, or ASSIGNED status
-It won't work for failing tasks in COMPLETED or CALLBACKS status.
-
+ >>> j._status = zc.async.interfaces.NEW
>>> j.fail()
+ >>> j.status == zc.async.interfaces.COMPLETED
+ True
+ >>> j.fail()
Traceback (most recent call last):
...
- BadStatusError: can only call fail on a job with NEW, PENDING, ASSIGNED, or ACTIVE status
+ BadStatusError: can only call fail on a job with NEW, PENDING, or ASSIGNED status
+
+It's a dirty secret that it will work in CALLBACKS status. This is because we
+need to support retries in the face of internal commits.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
>>> j._status = zc.async.interfaces.CALLBACKS
+ >>> transaction.commit()
>>> j.fail()
- Traceback (most recent call last):
+ >>> j.status == zc.async.interfaces.COMPLETED
+ True
+ >>> j.result is None # the default
+ True
+
+--------------
+Retry Policies
+--------------
+
+All of the other failure situations touch on retry policies, directly or
+indirectly.
+
+What is a retry policy? It is used in three circumstances.
+
+- When the code the job ran fails, the job will try to call
+ ``retry_policy.jobError(failure, data_cache)`` to get a boolean as to whether
+ the job should be retried.
+
+- When the commit fails, the job will try to call
+ ``retry_policy.commitError(failure, data_cache)`` to get a boolean as to
+ whether the job should be retried.
+
+- When the job starts but fails to complete because the system is interrupted,
+ the job will try to call ``retry_policy.interrupted()`` to get a boolean as
+ to whether the job should be retried.
+
+Why does this need to be a policy? Can't it be a simpler arrangement?
+
+The heart of the problem is that different jobs need different error
+resolutions.
+
+In some cases, jobs may not be fully transactional. For instance, the job
+may be communicating with an external system, such as a credit card system.
+The retry policy here should typically be "never": perhaps a callback should be
+in charge of determining what to do next.
+
+If a job is fully transactional, it can be retried. But even then the desired
+behavior may differ.
+
+- In typical cases, some errors should simply cause a failure, while other
+ errors, such as database conflict errors, should cause a limited number of
+ retries.
+
+- In some jobs, conflict errors should be retried forever, because the job must
+ be run to completion or else the system should fall over. Callbacks that try
+ to handle errors themselves may take this approach, for instance.
+
+zc.async currently ships with three retry policies.
+
+1. The default, appropriate for most fully transactional jobs, is the
+ zc.async.job.RetryCommonFourTimes.
+
+2. The other available (pre-written) option for transactional jobs is
+ zc.async.job.RetryCommonForever. Callbacks will get this policy by
+ default.
+
+Both of these policies retry ZEO disconnects forever; and interrupts and
+transaction errors such as conflicts either four times (for a total of five
+attempts) or forever, respectively.
+
+3. The last retry policy is zc.async.job.NeverRetry. This is appropriate for
+ non-transactional jobs. You'll still typically need to handle errors in
+ your callbacks.
+
+Let's take a detailed look at these three in isolation before seeing them in
+practice below.
+
+We'll start with the default retry policy, RetryCommonFourTimes. We can get it
+with ``getRetryPolicy``.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+ True
+ >>> verifyObject(zc.async.interfaces.IRetryPolicy, policy)
+ True
+
+Now we'll try out a few calls. ``jobError`` and ``commitError`` both take a
+failure and a dictionary to stash data about the retry. The job then calls
+the retry policy with the data dictionary before a commit using the
+``updateData`` method.
+
+Here's the policy requesting that a job be tried a total of five times,
+combined from in the job and in the commit
+
+ >>> import zc.twist
+ >>> import ZODB.POSException
+ >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+ >>> data = {}
+
+ >>> policy.jobError(conflict, data)
+ True
+ >>> policy.jobError(conflict, data)
+ True
+ >>> policy.jobError(conflict, data)
+ True
+ >>> policy.jobError(conflict, data)
+ True
+ >>> policy.jobError(conflict, data)
+ False
+
+Now we've expired the total number of retries for this kind of error, so during
+commit it will fail immediately.
+
+ >>> policy.commitError(conflict, data)
+ False
+
+We'll reset the data (which holds the information until ``updateData`` is
+called to store the information persistently, so that aborted transactions
+don't discard the history from past attempts) and try again.
+
+ >>> data = {}
+ >>> policy.commitError(conflict, data)
+ True
+ >>> policy.commitError(conflict, data)
+ True
+ >>> policy.commitError(conflict, data)
+ True
+ >>> policy.commitError(conflict, data)
+ True
+ >>> policy.commitError(conflict, data)
+ False
+
+A ZEO ClientDisconnected error will be retried forever. We treat 50 as close
+enough to "forever". It uses time.sleep to backoff. We'll stub that so we can
+see what happens.
+
+ >>> import time
+ >>> sleep_requests = []
+ >>> def sleep(i):
+ ... sleep_requests.append(i)
...
- BadStatusError: can only call fail on a job with NEW, PENDING, ASSIGNED, or ACTIVE status
+ >>> old_sleep = time.sleep
+ >>> time.sleep = sleep
-Using ``resumeCallbacks``
--------------------------
+ >>> import ZEO.Exceptions
+ >>> disconnect = zc.twist.Failure(ZEO.Exceptions.ClientDisconnected())
-So ``fail`` is the proper way to handle an active job that was being
-worked on by on eof a dead dispatcher's worker thread, but how does one
-handle a job that was in the CALLBACKS status? The answer is to use
-resumeCallbacks. Any job that is still pending will be called; any
-job that is active will be failed; any job that is in the middle
-of calling its own callbacks will have its ``resumeCallbacks`` called; and
-any job that is completed will be ignored.
+ >>> for i in range(50):
+ ... if not policy.jobError(disconnect, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+ >>> sleep_requests # doctest: +ELLIPSIS
+ [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+ >>> len(sleep_requests)
+ 50
+After jobError has upped the backoff, it keeps at 60 seconds for
+ClientDisconnected errors at commit.
+
+ >>> policy.commitError(disconnect, data)
+ True
+ >>> sleep_requests[50]
+ 60
+
+Here's the backoff happening in commitError.
+
+ >>> del sleep_requests[:]
+ >>> data = {}
+ >>> for i in range(50):
+ ... if not policy.commitError(disconnect, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+ >>> sleep_requests # doctest: +ELLIPSIS
+ [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+ >>> len(sleep_requests)
+ 50
+
+If we encounter another kind of error, no retries are requested.
+
+ >>> runtime = zc.twist.Failure(RuntimeError())
+ >>> policy.jobError(runtime, data)
+ False
+ >>> policy.commitError(runtime, data)
+ False
+ >>> value = zc.twist.Failure(ValueError())
+ >>> policy.jobError(value, data)
+ False
+ >>> policy.commitError(value, data)
+ False
+
+When the system is interrupted, ``handleInterrupt`` uses the retry_policy's
+``interrupted`` method to determine what to do. It does not need to pass a
+data dictionary. The default policy will retry ten times, for a total of ten
+attempts.
+
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ False
+
+``updateData`` is only used after ``jobError`` and ``commitError``, and is
+called every time there is a commit attempt, so we're wildly out of order here,
+but we'll call the method now anyway to show we can, and then perform the job.
+
+ >>> transaction.commit()
+ >>> policy.updateData(data)
+ >>> j()
+ 10
+
+The policy may also want to perform additional logging.
+
+The other policies perform similarly. [#show_other_policies]_
+
+To change a policy on a job, you have two options. First, you can change the
+``retry_policy_factory`` on your instance.
+
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> j.retry_policy_factory = zc.async.job.NeverRetry
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.NeverRetry)
+ True
+
+Second, you can leverage the fact that the default policy tries to adapt the
+job to zc.async.interfaces.IRetryPolicy (with the default name of ''), and only
+if that fails does it use RetryCommonFourTimes.
+
+ >>> import zope.component
+ >>> zope.component.provideAdapter(
+ ... zc.async.job.RetryCommonForever,
+ ... provides=zc.async.interfaces.IRetryPolicy)
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonForever)
+ True
+
+ >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+ ... zc.async.job.RetryCommonForever,
+ ... provided=zc.async.interfaces.IRetryPolicy) # tearDown
+ True
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+ True
+
+If you are working with callbacks, the default retry policy is
+``RetryCommonForever``.
+
+ >>> def foo(result):
+ ... print result
+ ...
+ >>> callback = j.addCallback(foo)
+ >>> policy = callback.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonForever)
+ True
+
+This can be changed in the same kind of way as a non-callback job. You can
+set the ``retry_policy_factory``.
+
+ >>> callback = j.addCallback(foo)
+ >>> callback.retry_policy_factory = zc.async.job.NeverRetry
+ >>> policy = callback.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.NeverRetry)
+ True
+
+You can also register an adapter. In this case it must be an adapter
+registered with the 'callback' name.
+
+ >>> zope.component.provideAdapter(
+ ... zc.async.job.RetryCommonFourTimes,
+ ... provides=zc.async.interfaces.IRetryPolicy, name='callback')
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> callback = j.addCallback(foo)
+ >>> policy = callback.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+ True
+
+ >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+ ... zc.async.job.RetryCommonFourTimes,
+ ... provided=zc.async.interfaces.IRetryPolicy, name='callback') # tearDown
+ True
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> callback = j.addCallback(foo)
+ >>> policy = callback.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonForever)
+ True
+
+Finally, it is worth noting that, typically, once a retry policy has been
+obtained with ``getRetryPolicy``, changing the factory or adapter registration
+will not change the policy: it has already been instantiated and stored on the
+job.
+
+ >>> callback.retry_policy_factory = zc.async.job.NeverRetry
+ >>> isinstance(callback.getRetryPolicy(), zc.async.job.NeverRetry)
+ False
+ >>> policy is callback.getRetryPolicy()
+ True
+
+Now we'll look at the standard retry policies in use as we examine more failure
+scenarios below.
+
+---------------------
+Failures in Your Code
+---------------------
+
+As seen above, failures in your code will generally be ignored by the default
+retry policy, and passed on as a Failure object on the job's request. The
+only difference is for ConflictErrors (retry five times) and ClientDisconnected
+(retry forever). Let's manufacture some of these problems.
+
+First, a few "normal" errors. They just go to the result.
+
+ >>> count = 0
+ >>> max = 50
+ >>> def raiseAnError(klass):
+ ... global count
+ ... count += 1
+ ... if count < max:
+ ... raise klass()
+ ... else:
+ ... print 'tried %d times. stopping.' % (count,)
+ ... return 42
+ ...
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError, ValueError)
+ >>> transaction.commit()
+ >>> job()
+ <zc.twist.Failure exceptions.ValueError>
+ >>> job.result
+ <zc.twist.Failure exceptions.ValueError>
+ >>> count
+ 1
+ >>> count = 0
+
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+ >>> transaction.commit()
+ >>> job()
+ <zc.twist.Failure exceptions.TypeError>
+ >>> job.result
+ <zc.twist.Failure exceptions.TypeError>
+ >>> count
+ 1
+ >>> count = 0
+
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError, RuntimeError)
+ >>> transaction.commit()
+ >>> job()
+ <zc.twist.Failure exceptions.RuntimeError>
+ >>> job.result
+ <zc.twist.Failure exceptions.RuntimeError>
+ >>> count
+ 1
+ >>> count = 0
+
+If we raise a ConflictError (or any TransactionError), that will try five
+times, and then be let through, so it will look very similar to the previous
+examples except for our ``count`` variable.
+
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+ ... ZODB.POSException.ConflictError)
+ >>> transaction.commit()
+ >>> job()
+ <zc.twist.Failure ZODB.POSException.ConflictError>
+ >>> job.result
+ <zc.twist.Failure ZODB.POSException.ConflictError>
+ >>> count
+ 5
+ >>> count = 0
+
+It's worth showing that, as you'd expect, if the job succeeds within the
+allotted retries, everything is fine.
+
+ >>> max = 3
+
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+ ... ZODB.POSException.ConflictError)
+ >>> transaction.commit()
+ >>> job()
+ tried 3 times. stopping.
+ 42
+ >>> job.result
+ 42
+ >>> count
+ 3
+ >>> count = 0
+
+If we raise a ClientDisconnected error, that will retry forever, with a
+timeout. our job doesn't let this happen, but the retry policy would.
+
+ >>> max = 50
+ >>> del sleep_requests[:]
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+ ... ZEO.Exceptions.ClientDisconnected)
+ >>> transaction.commit()
+ >>> job()
+ tried 50 times. stopping.
+ 42
+ >>> count
+ 50
+ >>> sleep_requests # doctest: +ELLIPSIS
+ [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+ >>> len(sleep_requests)
+ 49
+ >>> count = 0
+
+None of the provided retry policies utilize this functionality, but the
+pertinent retry policy method, ``jobError`` can not only return ``True``
+(retry now) or ``False`` (do not retry) but a datetime.datetime or a
+datetime.timedelta. Datetimes and timedeltas indicate that the policy requests
+that the job be returned to the queue to be claimed again. Let's give an
+example of this.
+
+As implied by this description, this functionality only makes sense within
+a fuller zc.async context. We need a stub Agent as a job's parent, and a stub
+Queue in its ``parent`` lineage.
+
+ >>> class StubQueue(persistent.Persistent):
+ ... zope.interface.implements(zc.async.interfaces.IQueue)
+ ... def putBack(self, job):
+ ... self.put_back = job
+ ... job.parent = self
+ ... def put(self, job, begin_after=None):
+ ... self.put_job = job
+ ... self.put_begin_after = begin_after
+ ... job.parent = self
+ ...
+ >>> class StubAgent(persistent.Persistent):
+ ... zope.interface.implements(zc.async.interfaces.IAgent)
+ ... def __init__(self):
+ ... # usually would have dispatcher agent parent, which would then
+ ... # have queue as parent, but this is a stub
+ ... self.parent = StubQueue()
+ ... def jobCompleted(self, job):
+ ... self.completed = job
+ ... def remove(self, job):
+ ... job.parent = None
+ ... self.removed = job
+ ...
+
+We'll also need a quick stub retry policy that takes advantage of this
+functionality.
+
+ >>> import persistent
+ >>> import datetime
+ >>> class StubRescheduleRetryPolicy(persistent.Persistent):
+ ... zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+ ... _reply = datetime.timedelta(hours=1)
+ ... def __init__(self, job):
+ ... self.parent = self.__parent__ = job
+ ... def jobError(self, failure, data_cache):
+ ... return self._reply
+ ... def commitError(self, failure, data_cache):
+ ... return self._reply
+ ... def interrupted(self):
+ ... return self._reply
+ ... def updateData(self, data_cache):
+ ... pass
+ ...
+
+ >>> j = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+ >>> agent = j.parent = StubAgent()
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> transaction.commit()
+ >>> j is j()
+ True
+
+Notice above that, when this happens, the result of the call is the job itself.
+
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> agent.removed is j
+ True
+ >>> agent.parent.put_job is j
+ True
+ >>> agent.parent.put_begin_after
+ datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+ >>> import pytz
+ >>> j = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+ >>> agent = j.parent = StubAgent()
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+ >>> transaction.commit()
+ >>> j is j()
+ True
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> agent.removed is j
+ True
+ >>> agent.parent.put_job is j
+ True
+ >>> agent.parent.put_begin_after
+ datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+------------------
+Failures on Commit
+------------------
+
+Failures on commit are handled very similarly to those occurring while
+performing the job.
+
+We'll have to hack ``TransactionManager.commit`` to show this.
+
+ >>> from transaction import TransactionManager
+ >>> old_commit = TransactionManager.commit
+ >>> commit_count = 0
+ >>> error = None
+ >>> max = 2
+ >>> allow_commits = [1] # change state to active
+ >>> def new_commit(self):
+ ... global commit_count
+ ... commit_count += 1
+ ... if commit_count in allow_commits:
+ ... old_commit(self) # changing state to "active" or similar
+ ... elif commit_count - len(allow_commits) < max:
+ ... raise error()
+ ... else:
+ ... if commit_count - len(allow_commits) == max:
+ ... print 'tried %d time(s). committing.' % (
+ ... commit_count-len(allow_commits))
+ ... old_commit(self)
+ ...
+ >>> TransactionManager.commit = new_commit
+ >>> count = 0
+ >>> def count_calls():
+ ... global count
+ ... count += 1
+ ... return 42
+ ...
+
+Normal errors will simply pass through with only one try.
+
+ >>> error = ValueError
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 2 time(s). committing.
+ <zc.twist.Failure exceptions.ValueError>
+ >>> count
+ 1
+
+Note that, since the error at commit is obscuring the original result, the
+original result is logged.
+
+ >>> for r in reversed(event_logs.records):
+ ... if r.levelname == 'INFO':
+ ... break
+ ... else:
+ ... assert False, 'could not find log'
+ ...
+ >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Commit failed ... Prior to this, job succeeded with result: 42
+
+This happens both for non-error results (as above) and for error results.
+
+ >>> count = commit_count = 0
+ >>> allow_commits = [1, 2, 3] # set to active, then get retry policy, then call
+ >>> j = root['j'] = zc.async.job.Job(raiseAnError, RuntimeError)
+ >>> transaction.commit()
+ >>> j()
+ tried 2 time(s). committing.
+ <zc.twist.Failure exceptions.ValueError>
+ >>> found = []
+ >>> for r in reversed(event_logs.records):
+ ... if r.levelname == 'ERROR':
+ ... found.append(r)
+ ... if len(found) == 2:
+ ... break
+ ... else:
+ ... assert False, 'could not find log'
+ ...
+ >>> print found[1].getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Commit failed ... Prior to this, job failed with traceback:
+ ...
+ exceptions.RuntimeError...
+
+The ValueError, from transaction time, is the main error recorded.
+
+ >>> print found[0].getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ <...Job...raiseAnError... failed with traceback:
+ ...
+ exceptions.ValueError...
+
+The conflict error will be tried five times.
+
+ >>> max = 6
+ >>> count = commit_count = 0
+ >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13] # set active state, then get retries
+ >>> error = ZODB.POSException.ConflictError
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 6 time(s). committing.
+ <zc.twist.Failure ZODB.POSException.ConflictError>
+ >>> count
+ 5
+
+If it succeeds in the number of tries allotted, the result will be fine.
+
+ >>> max = 3
+ >>> count = commit_count = 0
+ >>> allow_commits = [1, 3, 4, 6, 7]
+ >>> error = ZODB.POSException.ConflictError
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 3 time(s). committing.
+ 42
+ >>> count
+ 3
+
+As an aside, the code regards getting the desired retry policy as a requirement
+and so will try until it does so. Here's a repeat of the previous example,
+with some retries getting a failed commit. Everything except a ConflictError
+has a backoff.
+
+ >>> max = 25
+ >>> count = commit_count = 0
+ >>> allow_commits = [1]
+ >>> error = ZODB.POSException.ConflictError
+ >>> del sleep_requests[:]
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 25 time(s). committing.
+ 42
+ >>> count
+ 2
+ >>> sleep_requests
+ []
+
+The ClientDisconnected will be tried forever until it succeeds, with a backoff.
+We're putting in some irregular patterns of commits to test that these also
+work, which skew some of the results
+
+ >>> max = 50
+ >>> count = commit_count = 0
+ >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 22, 25]
+ >>> del sleep_requests[:]
+ >>> error = ZEO.Exceptions.ClientDisconnected
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 50 time(s). committing.
+ 42
+ >>> count
+ 9
+ >>> len(sleep_requests)
+ 51
+
+None of the provided retry policies utilize this functionality, but the
+pertinent retry policy method, ``jobError`` can not only return ``True``
+(retry now) or ``False`` (do not retry) but a datetime.datetime or a
+datetime.timedelta. Datetimes and timedeltas indicate that the policy requests
+that the job be returned to the queue to be claimed again. Let's give an
+example of this.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> agent = j.parent = StubAgent()
+ >>> StubRescheduleRetryPolicy._reply = datetime.timedelta(hours=1)
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> count = commit_count = 0
+ >>> max = 1
+ >>> error = ValueError
+ >>> transaction.commit()
+ >>> j is j()
+ True
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> agent.removed is j
+ True
+ >>> agent.parent.put_job is j
+ True
+ >>> agent.parent.put_begin_after
+ datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> agent = j.parent = StubAgent()
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+ >>> count = commit_count = 0
+ >>> transaction.commit()
+ >>> j is j()
+ True
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> agent.removed is j
+ True
+ >>> agent.parent.put_job is j
+ True
+ >>> agent.parent.put_begin_after
+ datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+All of this looks very similar to job errors. The only big difference between
+the behavior of job errors and commit errors is that a failure of a failure to
+commit *must* commit: it will try forever until it succeeds or is interrupted.
+Here's an example with a ConflictError.
+
+ >>> count = commit_count = 0
+ >>> max = 50
+ >>> error = ZODB.POSException.ConflictError
+ >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13]
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 50 time(s). committing.
+ <zc.twist.Failure ZODB.POSException.ConflictError>
+ >>> count
+ 5
+
+ >>> TransactionManager.commit = old_commit
+
+ >>> time.sleep = old_sleep # tearDown
+
+-------------------
+``handleInterrupt``
+-------------------
+
+Not infrequently, systems will be stopped while jobs are in the middle of their
+work. When the clean-up occurs, the job needs to figure out what to do to
+handle the interruption. The right thing to do is to call ``handleInterrupt``.
+This method itself defers to a retry policy to determine what to do.
+
+This method takes no arguments. The default behavior is to allow up to 10
+interruptions. It expects a parent agent to reschedule it in a queue. Let's
+give an example.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> queue = j.parent = StubQueue()
+ >>> j._status = zc.async.interfaces.ACTIVE
+ >>> transaction.commit()
+ >>> j.handleInterrupt()
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> queue.put_back is j
+ True
+
+After 9 interruptions, the default policy will give up after the tenth try.
+
+ >>> for i in range(8):
+ ... j.parent = agent
+ ... j._status = zc.async.interfaces.ACTIVE
+ ... j.handleInterrupt()
+ ... if j.status != zc.async.interfaces.PENDING:
+ ... print 'error', i, j.status
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+ >>> j.parent = agent
+ >>> j._status = zc.async.interfaces.ACTIVE
+ >>> j.handleInterrupt()
+ >>> j.status == zc.async.interfaces.COMPLETED
+ True
+ >>> j.result
+ <zc.twist.Failure zc.async.interfaces.AbortedError>
+
+If the retry policy returns True, as happens above, this should be interpreted
+by the agent and queue as a request to immediately reschedule the job. The
+policy can also return a datettime.timedelta or datetime.datetime to ask that
+it be rescheduled later in the future.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> queue = j.parent = StubQueue()
+ >>> StubRescheduleRetryPolicy._reply = datetime.timedelta(hours=1)
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> j._status = zc.async.interfaces.ACTIVE
+ >>> transaction.commit()
+ >>> j.handleInterrupt()
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> queue.put_job is j
+ True
+ >>> queue.put_begin_after
+ datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> queue = j.parent = StubQueue()
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+ >>> j._status = zc.async.interfaces.ACTIVE
+ >>> transaction.commit()
+ >>> j.handleInterrupt()
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> queue.put_job is j
+ True
+ >>> queue.put_begin_after
+ datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+In a full zc.async context, rescheduling is more of a dance between the job,
+the agent, and the queue. See `catastrophes.txt` for more information.
+
+-------------------
+``resumeCallbacks``
+-------------------
+
+``handleInterrupt`` should be called when a job was working on its own code.
+But what happens if the system stops while a job was working on its callbacks?
+When the job is handled, the system should call ``resumeCallbacks``. The
+method will call any callback that is still pending, and not timed out because
+of ``begin_by``; it will call ``fail`` on any callback that is pending and
+timed out; it will call ``handleInterrupt`` on any callback that is active'; it
+will call ``resumeCallbacks`` on any callback that itself has the CALLBACKS
+status; and will ignore any job that is completed. As such, it encompasses
+all of the retry behavior discussed above. Moreover, while it does not use
+retry policies directly, indirectly they are often used.
+
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
>>> j._result = 10
>>> j._status = zc.async.interfaces.CALLBACKS
>>> completed_j = zc.async.job.Job(multiply, 3)
@@ -517,11 +1369,8 @@
80
>>> sub_callbacks_j.status == zc.async.interfaces.COMPLETED
True
- >>> print active_j.result.getTraceback()
- ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- Traceback (most recent call last):
- ...
- zc.async.interfaces.AbortedError:
+ >>> active_j.result # was retried because of default retry policy
+ 50
>>> active_j.status == zc.async.interfaces.COMPLETED
True
>>> pending_j.result
@@ -529,8 +1378,12 @@
>>> pending_j.status == zc.async.interfaces.COMPLETED
True
-[#aborted_active_callback_log]_
+[#retried_active_callback_log]_
+Introspection
+=============
+
+------------------------------------
Introspecting and Mutating Arguments
------------------------------------
@@ -567,6 +1420,7 @@
>>> res = j() # doctest: +ELLIPSIS
<zc.async.job.Job (oid ...>
+-----------------
Result and Status
-----------------
@@ -673,6 +1527,7 @@
- Also, a job's direct callback may not call the job
[#callback_self]_.
+----------------------
More Job Introspection
----------------------
@@ -769,6 +1624,8 @@
True
>>> transaction.abort()
+ >>> time.sleep = old_sleep # probably put in test tearDown
+
=========
Footnotes
=========
@@ -783,15 +1640,17 @@
>>> root = conn.root()
>>> import zc.async.configure
>>> zc.async.configure.base()
+ >>> import zc.async.testing
+ >>> zc.async.testing.setUpDatetime() # pins datetimes
.. [#verify] Verify interface
>>> from zope.interface.verify import verifyObject
>>> verifyObject(zc.async.interfaces.IJob, j)
True
-
+
Note that status and result are readonly.
-
+
>>> j.status = 1
Traceback (most recent call last):
...
@@ -804,9 +1663,9 @@
.. [#no_live_frames] Failures have two particularly dangerous bits: the
traceback and the stack. We use the __getstate__ code on Failures
to clean them up. This makes the traceback (``tb``) None...
-
+
>>> j.result.tb # None
-
+
...and it makes all of the values in the stack--the locals and
globals-- into strings. The stack is a list of lists, in which each
internal list represents a frame, and contains five elements: the
@@ -814,14 +1673,14 @@
the line number (``f_lineno``), an items list of the locals, and an
items list for the globals. All of the values in the items list
would normally be objects, but are now strings.
-
+
>>> for (codename, filename, lineno, local_i, global_i) in j.result.stack:
... for k, v in local_i:
... assert isinstance(v, basestring), 'bad local %s' % (v,)
... for k, v in global_i:
... assert isinstance(v, basestring), 'bad global %s' % (v,)
...
-
+
Here's a reasonable question. The Twisted Failure code has a
__getstate__ that cleans up the failure, and that's even what we are
using to sanitize the failure. If the failure is attached to a
@@ -845,7 +1704,7 @@
desired changes to the transaction (such as aborting) before the
job calls commit. Compare. Here is a call that raises an
exception, and rolls back changes.
-
+
(Note that we are passing arguments to the job, a topic that has
not yet been discussed in the text when this footnote is given: read
on a bit in the main text to see the details, if it seems surprising
@@ -942,14 +1801,16 @@
... assert False, 'could not find log'
...
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- <zc.async.job.Job (oid 114, db 'unnamed')
+ <zc.async.job.Job (oid ..., db 'unnamed')
``zc.async.job.completeStartedJobArguments(zc.async.job.Job
- (oid 109, db 'unnamed'))``>
+ (oid ..., db 'unnamed'))``>
succeeded with result:
None
- However, the callbacks that fail themselves are logged (again, as all jobs
- are) at the "error" level and include a detailed traceback.
+ However, the callbacks that fail themselves are logged (by default at a
+ "CRITICAL" level because callbacks are often in charge of error handling,
+ and having an error in your error handler may be a serious problem) and
+ include a detailed traceback.
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 4)
>>> transaction.commit()
@@ -957,8 +1818,8 @@
>>> j() # doctest: +ELLIPSIS
20
- >>> for r in reversed(trace_logs.records):
- ... if r.levelname == 'ERROR':
+ >>> for r in reversed(event_logs.records):
+ ... if r.levelname == 'CRITICAL':
... break
... else:
... assert False, 'could not find log'
@@ -966,29 +1827,204 @@
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<...Job...``zc.async.doctest_test.multiply()``> failed with traceback:
*--- Failure #... (pickled) ---
- .../zc/async/job.py:...
+ ...job.py:...
[ Locals...
- res : 'None...
- ( Globals...
- .../zc/async/job.py:...
- [ Locals...
effective_args : '[20]...
( Globals...
exceptions.TypeError: multiply() takes at least 2 arguments (1 given)
*--- End of Failure #... ---
<BLANKLINE>
-.. [#aborted_active_callback_log] The fact that the pseudo-aborted "active"
- job failed is logged.
+.. [#show_other_policies]
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> j.retry_policy_factory = zc.async.job.RetryCommonForever
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonForever)
+ True
+
+ Here's the policy requesting that a job be tried "forever" (we show 50
+ times), both in the job and in the commit.
+
+ >>> import zc.twist
+ >>> import ZODB.POSException
+ >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+ >>> data = {}
+
+ >>> for i in range(50):
+ ... if not policy.jobError(conflict, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+
+ >>> for i in range(50):
+ ... if not policy.commitError(conflict, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+
+ A ZEO ClientDisconnected error will also be retried forever, but with a
+ backoff.
+
+ >>> import ZEO.Exceptions
+ >>> disconnect = zc.twist.Failure(ZEO.Exceptions.ClientDisconnected())
+ >>> del sleep_requests[:]
+
+ >>> for i in range(50):
+ ... if not policy.jobError(disconnect, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+ >>> sleep_requests # doctest: +ELLIPSIS
+ [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+ >>> len(sleep_requests)
+ 50
+ >>> del sleep_requests[:]
+
+ >>> data = {}
+ >>> for i in range(50):
+ ... if not policy.commitError(disconnect, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+ >>> sleep_requests # doctest: +ELLIPSIS
+ [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+ >>> len(sleep_requests)
+ 50
+
+ If we encounter another kind of error during the job, no retries are
+ requested.
+
+ >>> runtime = zc.twist.Failure(RuntimeError())
+ >>> policy.jobError(runtime, data)
+ False
+ >>> value = zc.twist.Failure(ValueError())
+ >>> policy.jobError(value, data)
+ False
+
+ However, during the commit, any kind of error will cause a retry, forever.
+
+ >>> for i in range(50):
+ ... if not policy.commitError(runtime, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+
+ >>> for i in range(50):
+ ... if not policy.commitError(value, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+
+ When the system is interrupted, ``handleInterrupt`` uses the retry_policy's
+ ``interrupted`` method to determine what to do. It does not need to pass a
+ data dictionary. This also happens forever.
+
+ >>> for i in range(50):
+ ... if not policy.interrupted():
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+
+ ``updateData`` is only used after ``jobError`` and ``commitError``, and is
+ called every time there is a commit attempt, so we're wildly out of order here,
+ but we'll call the method now anyway to show we can, and then perform the job.
+
+ >>> transaction.commit()
+ >>> policy.updateData(data)
+ >>> j()
+ 10
+
+Now we will show ``NeverRetry``. As the name implies, this is a simple policy
+for jobs that should never retry. Typical reasons for this are that the job is
+talking to an external system or doing something else that is effectively not
+transactional. More sophisticated policies for specific versions of this
+scenario may be possible; however, also consider callbacks for this usecase.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> j.retry_policy_factory = zc.async.job.NeverRetry
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.NeverRetry)
+ True
+
+ So, here's a bunch of "no"s.
+
+ >>> import zc.twist
+ >>> import ZODB.POSException
+ >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+ >>> data = {}
+
+ >>> policy.jobError(conflict, data)
+ False
+
+ >>> policy.commitError(conflict, data)
+ False
+
+ >>> policy.jobError(disconnect, data)
+ False
+
+ >>> policy.commitError(disconnect, data)
+ False
+
+ >>> runtime = zc.twist.Failure(RuntimeError())
+ >>> policy.jobError(runtime, data)
+ False
+
+ >>> policy.commitError(runtime, data)
+ False
+
+ >>> value = zc.twist.Failure(ValueError())
+ >>> policy.jobError(value, data)
+ False
+
+ >>> policy.commitError(value, data)
+ False
+
+ >>> policy.interrupted()
+ False
+
+ ``updateData`` is only used after ``jobError`` and ``commitError``, and is
+ called every time there is a commit attempt, so we're wildly out of order here,
+ but we'll call the method now anyway to show we can, and then perform the job.
+
+ >>> transaction.commit()
+ >>> policy.updateData(data)
+ >>> j()
+ 10
+
+.. [#retried_active_callback_log] The fact that the pseudo-aborted "active"
+ job was retried is logged.
+
>>> for r in reversed(trace_logs.records):
- ... if r.levelname == 'DEBUG' and r.getMessage().startswith('failing'):
+ ... if (r.levelname == 'DEBUG' and
+ ... r.getMessage().startswith('retrying interrupted')):
... break
... else:
... assert False, 'could not find log'
...
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- failing aborted callback
+ retrying interrupted callback
<...Job...``zc.async.doctest_test.multiply(5)``> to <...Job...>
.. [#call_self] Here's a job trying to call itself.
Modified: zc.async/branches/dev/src/zc/async/queue.py
===================================================================
--- zc.async/branches/dev/src/zc/async/queue.py 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/queue.py 2008-06-17 02:38:06 UTC (rev 87444)
@@ -13,6 +13,7 @@
##############################################################################
import datetime
import bisect
+import logging
import pytz
import persistent
import persistent.interfaces
@@ -23,6 +24,7 @@
import zope.component
import zope.event
import zope.bforest
+import zope.minmax
import zc.queue
import zc.dict
@@ -43,22 +45,20 @@
UUID = None
activated = None
-
+
def __init__(self, uuid):
super(DispatcherAgents, self).__init__()
self.UUID = uuid
- self.__class__.last_ping.initialize(self)
-
- zc.async.utils.createAtom('last_ping', None)
-
+ self.last_ping = zope.minmax.Maximum()
+
ping_interval = datetime.timedelta(seconds=30)
ping_death_interval = datetime.timedelta(seconds=60)
@property
def dead(self):
- last_ping = self.last_ping
+ last_ping = self.last_ping.value
if self.activated and (
- self.last_ping is None or self.activated > self.last_ping):
+ last_ping is None or self.activated > last_ping):
last_ping = self.activated
elif last_ping is None:
return False
@@ -103,27 +103,43 @@
else:
while job is not None:
status = job.status
- if status == zc.async.interfaces.ASSIGNED:
+ if status in (zc.async.interfaces.PENDING,
+ zc.async.interfaces.ASSIGNED):
+ # odd
+ zc.async.log.warning(
+ 'unexpected job status %s for %r; treating as NEW',
+ status, job)
+ status = zc.async.interfaces.NEW
+ if status == zc.async.interfaces.NEW:
tmp = job.assignerUUID
job.assignerUUID = None
job.parent = None
queue.put(job)
job.assignerUUID = tmp
elif job.status == zc.async.interfaces.ACTIVE:
- queue.put(job.handleInterrupt)
+ j = queue.put(
+ job.handleInterrupt,
+ retry_policy_factory=zc.async.job.RetryCommonForever,
+ error_log_level=logging.CRITICAL)
elif job.status == zc.async.interfaces.CALLBACKS:
- queue.put(job.resumeCallbacks)
+ j = queue.put(
+ job.resumeCallbacks,
+ retry_policy_factory=zc.async.job.RetryCommonForever,
+ error_log_level=logging.CRITICAL)
elif job.status == zc.async.interfaces.COMPLETED:
# huh, that's odd.
agent.completed.add(job)
+ zc.async.utils.log.warning(
+ 'unexpectedly had to inform agent of completion '
+ 'of %r', job)
try:
job = agent.pull()
except IndexError:
job = None
zope.event.notify(
zc.async.interfaces.DispatcherDeactivated(self))
-
+
class Queues(zc.async.utils.Dict):
def __setitem__(self, key, value):
@@ -161,9 +177,10 @@
if not da.activated:
raise ValueError('UUID is not activated.')
now = datetime.datetime.now(pytz.UTC)
- if (da.last_ping is None or
- da.last_ping + da.ping_interval <= now):
- da.last_ping = now
+ last_ping = da.last_ping.value
+ if (last_ping is None or
+ last_ping + da.ping_interval <= now):
+ da.last_ping.value = now
next = self._getNextActiveSibling(uuid)
if next is not None and next.dead:
# `next` seems to be a dead dispatcher.
@@ -188,28 +205,37 @@
self.size = size
def clean(self):
+ now = datetime.datetime.now(pytz.UTC)
for i, job in enumerate(reversed(self._data)):
- if job.status in (
- zc.async.interfaces.CALLBACKS,
- zc.async.interfaces.COMPLETED):
+ status = job.status
+ if status in (zc.async.interfaces.CALLBACKS,
+ zc.async.interfaces.COMPLETED) or (
+ status == zc.async.interfaces.PENDING and
+ job.begin_after > now): # for a rescheduled task
self._data.pull(-1-i)
@property
def filled(self):
return len(self._data) >= self.size
+ def __contains__(self, item):
+ for i in self:
+ if i is item:
+ return True
+ return False
+
def add(self, item):
+ if item in self:
+ return
if not zc.async.interfaces.IJob.providedBy(item):
raise ValueError('must be IJob')
if self.name not in item.quota_names:
raise ValueError('quota name must be in quota_names')
- # self.clean()
if self.filled:
raise ValueError('Quota is filled')
self._data.put(item)
- for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__', 'get',
- '__contains__'):
+ for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__', 'get'):
locals()[nm] = zc.async.utils.simpleWrapper(nm)
@@ -229,6 +255,8 @@
class Queue(zc.async.utils.Base):
zope.interface.implements(zc.async.interfaces.IQueue)
+ _putback_queue = None
+
def __init__(self):
self._queue = zc.queue.CompositeQueue()
self._held = BTrees.OOBTree.OOBTree()
@@ -239,13 +267,13 @@
self.dispatchers.__parent__ = self
def put(self, item, begin_after=None, begin_by=None,
- error_log_level=None, retry_policy=None):
+ error_log_level=None, retry_policy_factory=None):
item = zc.async.interfaces.IJob(item)
if error_log_level is not None:
item.error_log_level = error_log_level
- if retry_policy is not None:
- item.retry_policy = retry_policy
- if item.assignerUUID is not None:
+ if retry_policy_factory is not None:
+ item.retry_policy_factory = retry_policy_factory
+ if item.status != zc.async.interfaces.NEW:
raise ValueError(
'cannot add already-assigned job')
for name in item.quota_names:
@@ -258,10 +286,9 @@
item.begin_after = now
if begin_by is not None:
item.begin_by = begin_by
- elif item.begin_by is None:
- item.begin_by = datetime.timedelta(hours=1) # good idea?
- item.assignerUUID = zope.component.getUtility(
- zc.async.interfaces.IUUID)
+ if item.assignerUUID is not None: # rescheduled job keeps old UUID
+ item.assignerUUID = zope.component.getUtility(
+ zc.async.interfaces.IUUID)
if item._p_jar is None:
# we need to do this if the job will be stored in another
# database as well during this transaction. Also, _held storage
@@ -279,7 +306,35 @@
self._length.change(1)
return item
+ def putBack(self, item):
+ # an agent has claimed a job, but now the job needs to be returned. the
+ # only current caller for this is a job's ``handleInterrupt`` method.
+ # The scenario for this is that the agent's dispatcher died while the
+ # job was active, interrupting the work; and the job's retry policy
+ # asks that the job be put back on the queue to be claimed immediately.
+ # This method puts the job in a special internal queue that ``_iter``
+ # looks at first. This allows jobs to maintain their order, if needed,
+ # within a quota.
+ assert zc.async.interfaces.IJob.providedBy(item)
+ assert item.status == zc.async.interfaces.NEW, item.status
+ assert item.begin_after is not None
+ assert item._p_jar is not None
+ # to support legacy instances of the queue that were created before
+ # this functionality and its separate internal data structure were
+ # part of the code, we instantiate the _putback_queue when we first
+ # need it, here.
+ if self._putback_queue is None:
+ self._putback_queue = zc.queue.CompositeQueue()
+ self._putback_queue.put(item)
+ item.parent = self
+ self._length.change(1)
+
def _iter(self):
+ putback_queue = self._putback_queue
+ if putback_queue: # not None and not empty
+ dq_pop = putback_queue.pull
+ for dq_ix, dq_next in enumerate(putback_queue):
+ yield dq_pop, dq_ix, dq_next
queue = self._queue
tree = self._held
q = enumerate(queue)
@@ -336,16 +391,17 @@
if not self._length():
return default
uuid = None
+ quotas_cleaned = set()
for pop, ix, job in self._iter():
if job.begin_after > now:
break
res = None
quotas = []
- if (job.begin_after + job.begin_by) < now:
- res = zc.async.interfaces.IJob(
- job.fail) # specify TimeoutError?
+ if (job.begin_by is not None and
+ (job.begin_after + job.begin_by) < now):
+ res = zc.async.interfaces.IJob(job.fail)
+ res.args.append(zc.async.interfaces.TimeoutError())
res.begin_after = now
- res.begin_by = datetime.timedelta(hours=1)
res.parent = self
if uuid is None:
uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
@@ -354,8 +410,10 @@
for name in job.quota_names:
quota = self.quotas.get(name)
if quota is not None:
- quota.clean()
- if quota.filled:
+ if name not in quotas_cleaned:
+ quota.clean()
+ quotas_cleaned.add(name)
+ if quota.filled and job not in quota:
break
quotas.append(quota)
else:
Modified: zc.async/branches/dev/src/zc/async/queue.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/queue.txt 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/queue.txt 2008-06-17 02:38:06 UTC (rev 87444)
@@ -24,7 +24,7 @@
[#queues_collection]_ As shown in the README.txt of this package (or see
zc.async.adapters.defaultQueueAdapter), the queue with the name '' will
typically be registered as an adapter to persistent objects that
-provides zc.async.interfaces.IQueue [#verify]_.
+provides zc.async.interfaces.IQueue [#verify]_.
The queue doesn't have any jobs yet.
@@ -82,25 +82,27 @@
>>> job.parent is queue
True
>>> transaction.commit()
-
+
A job added without any special calls gets a `begin_after` attribute
of now.
>>> import datetime
>>> import pytz
- >>> now = datetime.datetime.now(pytz.UTC)
+ >>> now = datetime.datetime.now(pytz.UTC)
>>> now
datetime.datetime(2006, 8, 10, 15, 44, 22, 211, tzinfo=<UTC>)
>>> job.begin_after == now
True
-A ``begin_by`` attribute is a duration, and defaults to one hour. This
-means that it must be completed an hour after the ``begin_after`` datetime,
-or else the system will fail it.
+A ``begin_by`` attribute is a duration (datetime.timedelta) or None, and
+defaults to None.
- >>> job.begin_by == datetime.timedelta(hours=1)
+ >>> job.begin_by == None
True
+If ``begin_by`` were an hour, that would mean that it must be completed an hour
+after the ``begin_after`` datetime, or else the system will fail it.
+
Now let's add a job to be performed later, using ``begin_after``.
This means that it's immediately ready to be performed: we can ``claim`` it.
@@ -240,9 +242,9 @@
- If begin_after + begin_by >= now, a job that makes the original job fail
is used instead.
-- If a job has one or more ``quota_names`` and the associated quotas are
- filled with jobs not in the CALLBACKS or COMPLETED status then it will
- not be returned.
+- If a job is not failing and has one or more ``quota_names`` and the
+ associated quotas are filled with jobs not in the CALLBACKS or COMPLETED
+ status then it will not be returned.
- It does not take an index argument.
@@ -391,22 +393,46 @@
... @property
... def queue(self):
... return self.parent.parent
- ... def claimJob(self):
+ ... def claimJob(self, filter=None):
... if len(self) < self.size:
- ... job = self.queue.claim()
+ ... job = self.queue.claim(filter=filter)
... if job is not None:
... job.parent = self
... self.append(job)
... return job
- ... def pull(self):
- ... return self.pop(0)
+ ... def pull(self, index=0):
+ ... res = self.pop(index)
+ ... res.parent = None
+ ... return res
+ ... def remove(self, item):
+ ... self.pull(self.index(item))
... def jobCompleted(self, job):
+ ... persistent.list.PersistentList.remove(self, job)
... self.completed.add(job)
- ...
+ ... def reschedule(self, item, when):
+ ... now = datetime.datetime.utcnow()
+ ... if isinstance(when, datetime.datetime):
+ ... self.remove(job)
+ ... if when.tzinfo is not None:
+ ... when = when.astimezone(pytz.UTC).replace(tzinfo=None)
+ ... if when <= now:
+ ... self.queue.disclaim(item)
+ ... else:
+ ... self.queue.put(item, begin_after=when)
+ ... elif isinstance(when, datetime.timedelta):
+ ... self.remove(job)
+ ... if when <= datetime.timedelta():
+ ... self.queue.disclaim(item)
+ ... else:
+ ... self.queue.put(item, begin_after=now+when)
+ ... else:
+ ... raise TypeError('``when`` must be datetime or timedelta')
+ ...
>>> job4 is queue.claim()
True
>>> job4.parent = StubAgent()
+ >>> job4.parent.append(job4)
>>> job4.status == zc.async.interfaces.ASSIGNED
True
>>> list(quota) == [job4]
@@ -426,23 +452,25 @@
timed out are returned in wrappers that fail the original job.
>>> job9_from_outer_space = queue.put(mock_work)
+ >>> job9_from_outer_space.begin_by = datetime.timedelta(hours=1)
>>> zc.async.testing.set_now(
... datetime.datetime.now(pytz.UTC) +
... job9_from_outer_space.begin_by + datetime.timedelta(seconds=1))
>>> job9 = queue.claim()
>>> job9 is job9_from_outer_space
False
- >>> stub = root['stub'] = StubAgent()
+ >>> stub = root['stub'] = StubAgent()
>>> job9.parent = stub
+ >>> stub.append(job9)
>>> transaction.commit()
>>> job9()
>>> job9_from_outer_space.status == zc.async.interfaces.COMPLETED
True
>>> print job9_from_outer_space.result.getTraceback()
+ ... # doctest: +NORMALIZE_WHITESPACE
Traceback (most recent call last):
- Failure: zc.async.interfaces.AbortedError:
+ Failure: zc.async.interfaces.TimeoutError:
<BLANKLINE>
-
Dispatchers
===========
@@ -492,12 +520,12 @@
>>> print da.activated
None
- >>> print da.last_ping
+ >>> print da.last_ping.value
None
-When the object's ``last_ping`` + ``ping_interval`` is greater than now,
-a new ``last_ping`` should be recorded, as we'll see below. If the
-``last_ping`` (or ``activated``, if more recent) +
+When the object's ``last_ping.value`` + ``ping_interval`` is greater than now,
+a new ``last_ping.value`` should be recorded, as we'll see below. If the
+``last_ping.value`` (or ``activated``, if more recent) +
``ping_death_interval`` is older than now, the dispatcher is considered to
be ``dead``.
@@ -514,7 +542,7 @@
>>> import pytz
>>> now = datetime.datetime.now(pytz.UTC)
>>> da.activate()
- >>> now <= da.activated <= datetime.datetime.now(pytz.UTC)
+ >>> now <= da.activated <= datetime.datetime.now(pytz.UTC)
True
It's still not dead. :-)
@@ -638,10 +666,10 @@
>>> before = datetime.datetime.now(pytz.UTC)
>>> pollStub(conn)
- >>> before <= da.last_ping <= datetime.datetime.now(pytz.UTC)
+ >>> before <= da.last_ping.value <= datetime.datetime.now(pytz.UTC)
True
-We don't have any jobs to claim yet. Let's add one and do it again.
+We don't have any jobs to claim yet. Let's add one and do it again.
We'll use a test fixture, time_flies, to make the time change.
>>> job10 = queue.put(mock_work)
@@ -652,10 +680,10 @@
... datetime.timedelta(seconds=seconds))
...
- >>> last_ping = da.last_ping
+ >>> last_ping = da.last_ping.value
>>> time_flies(5)
>>> pollStub(conn)
- >>> da.last_ping == last_ping
+ >>> da.last_ping.value == last_ping
True
>>> len(jobs_to_do)
@@ -669,11 +697,11 @@
>>> time_flies(10)
>>> pollStub(conn)
- >>> da.last_ping == last_ping
+ >>> da.last_ping.value == last_ping
True
>>> time_flies(15)
>>> pollStub(conn)
- >>> da.last_ping > last_ping
+ >>> da.last_ping.value > last_ping
True
Dead Dispatchers
@@ -684,7 +712,7 @@
then proceed. But what if a queue has more than one simultaneous
dispatcher? How do we know to clean out the dead dispatcher's jobs?
-The ``ping`` method not only changes the ``last_ping`` but checks the
+The ``ping`` method not only changes the ``last_ping.value`` but checks the
next sibling dispatcher, as defined by UUID, to make sure that it is not
dead. It uses the ``dead`` attribute, introduced above, to test whether
the sibling is alive.
@@ -708,8 +736,8 @@
False
Let's do that again, with an agent in the new dispatcher and some jobs in the
-agent. Assigned jobs will be reassigned; in-progress jobs will have a
-new task that fails them; callback jobs will resume their callback; and
+agent. Assigned jobs will be reassigned; in-progress jobs will have a new task
+that calls ``handleInterrupt``; callback jobs will resume their callback; and
completed jobs will be moved to the completed collection.
>>> alt_agent = alt_da['main'] = StubAgent()
@@ -755,16 +783,385 @@
4
>>> queue[1] is jobA
True
- >>> queue[2].callable == jobB.fail
+ >>> queue[2].callable == jobB.handleInterrupt
True
>>> queue[3].callable == jobC.resumeCallbacks
True
>>> alt_agent.completed.first() is jobD
True
+As an aside, notice that the ``handleInterrupt`` and ``resumeCallbacks`` jobs
+have custom error log levels, and custom retry policies.
+
+ >>> import logging
+ >>> queue[2].effective_error_log_level == logging.CRITICAL
+ True
+ >>> queue[3].effective_error_log_level == logging.CRITICAL
+ True
+ >>> queue[2].retry_policy_factory is zc.async.job.RetryCommonForever
+ True
+ >>> queue[3].retry_policy_factory is zc.async.job.RetryCommonForever
+ True
+
If you have multiple workers, it is strongly suggested that you get the
associated servers connected to a shared time server.
+Rescheduled Jobs
+----------------
+
+When a job is interrupted--it was active when it stopped--it asks its retry
+policy whether to retry. The policy can respond in one of three ways:
+
+- yes, please retry right now;
+
+- yes, please retry later; or
+
+- no, please do not retry, and fail instead.
+
+All three of these possibilities potentially affect the queue in different
+ways, so we will look at examples.
+
+First, let's look at the simple cases for all three.
+
+The scenario above lets us look at what is probably the most common case: the
+default retry policy. Let's claim the ``handleInterrupt`` task.
+
+ >>> len(queue)
+ 4
+ >>> j = alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'handleInterrupt')
+ ...
+ >>> j.callable == jobB.handleInterrupt
+ True
+ >>> len(queue)
+ 3
+
+The default retry policy's behavior is to retry nine times, for a total of ten
+attempts, and to request that the original job be retried as soon as
+possible--first in line. That behavior will become more interesting when we
+look at interaction with quotas below. But now, performing this job will push
+jobB back in the queue--again, notably, in very first place in the queue.
+
+ >>> j()
+ >>> len(queue)
+ 4
+ >>> queue[0] is jobB
+ True
+ >>> jobB is alt_agent.claimJob()
+ True
+ >>> jobB()
+ 42
+ >>> len(queue)
+ 3
+
+That was an example of the retry policy saying "please retry right now":
+returning ``True``. Now let's look at the other two possible responses.
+
+"please retry later": with a datetime.
+
+ >>> import persistent
+ >>> import datetime
+ >>> class StubRetryPolicy(persistent.Persistent):
+ ... zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+ ... _reply = datetime.datetime(3000, 1, 1)
+ ... def __init__(self, job):
+ ... self.parent = self.__parent__ = job
+ ... def jobError(self, failure, data_cache):
+ ... return self._reply
+ ... def commitError(self, failure, data_cache):
+ ... return self._reply
+ ... def interrupted(self):
+ ... return self._reply
+ ... def updateData(self, data_cache):
+ ... pass
+ ...
+
+ >>> j = zc.async.job.Job(mock_work)
+ >>> j._status = zc.async.interfaces.ACTIVE # hack internals for test
+ >>> j.retry_policy_factory = StubRetryPolicy
+ >>> j_wrap = queue.put(j.handleInterrupt)
+ >>> len(queue)
+ 4
+ >>> j_wrap is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'handleInterrupt')
+ ...
+ True
+ >>> len(queue)
+ 3
+ >>> j_wrap()
+ >>> len(queue)
+ 4
+ >>> queue[3] is j
+ True
+ >>> j.begin_after
+ datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+...with a timedelta.
+
+ >>> j = zc.async.job.Job(mock_work)
+ >>> j._status = zc.async.interfaces.ACTIVE # hack internals for test
+ >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+ >>> j.retry_policy_factory = StubRetryPolicy
+ >>> j_wrap = queue.put(j.handleInterrupt)
+ >>> len(queue)
+ 5
+ >>> j_wrap is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'handleInterrupt')
+ ...
+ True
+ >>> len(queue)
+ 4
+ >>> j_wrap()
+ >>> len(queue)
+ 5
+ >>> queue[3] is j
+ True
+ >>> j.begin_after
+ datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+"please do not retry": does not affect the queue, so we will not show it here.
+
+The trickiest aspects occur with quotas.
+
+"please retry now": with a quota. In this case, the job should retain its
+precise place in the quota if the quota has a limit of one, and should remain
+in the quota if the quota has a greater limit.
+
+ >>> quota = queue.quotas['content catalog']
+ >>> quota[0]()
+ 42
+ >>> quota.clean()
+ >>> len(quota)
+ 0
+ >>> j = queue.put(mock_work)
+ >>> j.quota_names = ('content catalog',)
+ >>> j2 = queue.put(mock_work)
+ >>> j2.quota_names = ('content catalog',)
+ >>> transaction.commit()
+ >>> len(queue)
+ 7
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... 'content catalog' in candidate.quota_names)
+ ...
+ True
+ >>> list(quota) == [j]
+ True
+
+ >>> j._status = zc.async.interfaces.ACTIVE # fake beginning to call
+ >>> alt_agent.remove(j)
+ >>> j_wrap = queue.put(j.handleInterrupt)
+
+The ``j`` job still claims the space in the quota, so ``j2`` can't be
+claimed.
+
+ >>> print alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... 'content catalog' in candidate.quota_names)
+ ...
+ None
+
+Now we'll get the ``handleInterrupt`` task. The ``j`` job still cannot be
+removed from the quota.
+
+ >>> j_wrap is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'handleInterrupt')
+ ...
+ True
+ >>> len(quota)
+ 1
+ >>> quota.clean()
+ >>> list(quota) == [j]
+ True
+
+Now we'll perform the task. The ``j`` job returns to the queue in a
+``PENDING`` status, and the quota still cannot be cleared.
+
+ >>> j_wrap()
+ >>> len(queue)
+ 7
+ >>> queue[0] is j
+ True
+ >>> len(quota)
+ 1
+ >>> quota.clean()
+ >>> list(quota) == [j]
+ True
+
+The agent can claim job ``j`` again, however.
+
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... 'content catalog' in candidate.quota_names)
+ ...
+ True
+
+Still cannot clear the quota.
+
+ >>> len(quota)
+ 1
+ >>> quota.clean()
+ >>> list(quota) == [j]
+ True
+
+Once we perform j, the quota can be cleared, and j2 can be claimed, as usual.
+
+ >>> j()
+ 42
+
+ >>> j2 is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... 'content catalog' in candidate.quota_names)
+ ...
+ True
+
+"please retry later": with a quota. In this case, other jobs can take the
+place of the original job in the quota.
+
+ >>> j3 = queue.put(mock_work)
+ >>> j3.quota_names = ('content catalog',)
+ >>> transaction.commit()
+ >>> j2.retry_policy_factory = StubRetryPolicy
+ >>> list(quota) == [j2]
+ True
+
+ >>> j2._status = zc.async.interfaces.ACTIVE # fake beginning to call
+ >>> alt_agent.remove(j2)
+ >>> j2_wrap = queue.put(j2.handleInterrupt)
+ >>> j2_wrap is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'handleInterrupt')
+ ...
+ True
+ >>> len(queue)
+ 6
+ >>> j2_wrap()
+ >>> len(queue)
+ 7
+ >>> queue[-2] is j2
+ True
+ >>> j2.status == zc.async.interfaces.PENDING
+ True
+
+ >>> j3 is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... 'content catalog' in candidate.quota_names)
+ ...
+ True
+ >>> j3()
+ 42
+
+A retry policy can also request resceduling from a jobError or a commitError,
+although the default retry policies do not.
+
+jobError: "please reschedule now"
+
+ >>> def raiseAnError():
+ ... raise RuntimeError
+ ...
+ >>> StubRetryPolicy._reply = datetime.timedelta()
+ >>> j = queue.put(raiseAnError, retry_policy_factory=StubRetryPolicy)
+ >>> len(queue)
+ 7
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'raiseAnError')
+ ...
+ True
+ >>> len(queue)
+ 6
+ >>> j() is j
+ True
+ >>> len(queue)
+ 7
+ >>> queue.claim() is j
+ True
+
+jobError: "please reschedule later"
+
+ >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+ >>> j = queue.put(raiseAnError, retry_policy_factory=StubRetryPolicy)
+ >>> len(queue)
+ 7
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'raiseAnError')
+ ...
+ True
+ >>> len(queue)
+ 6
+ >>> j() is j
+ True
+ >>> len(queue)
+ 7
+ >>> queue[-2] is j
+ True
+ >>> j.begin_after
+ datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+commitError: "please reschedule now"
+
+ >>> error_flag = False
+ >>> old_commit = transaction.TransactionManager.commit
+ >>> def commit(self):
+ ... global error_flag
+ ... if error_flag:
+ ... error_flag = False
+ ... raise ValueError()
+ ... old_commit(self)
+ ...
+ >>> transaction.TransactionManager.commit = commit
+ >>> def causeCommitError():
+ ... global error_flag
+ ... error_flag = True
+ ...
+ >>> StubRetryPolicy._reply = datetime.timedelta()
+ >>> j = queue.put(causeCommitError, retry_policy_factory=StubRetryPolicy)
+ >>> len(queue)
+ 8
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'causeCommitError')
+ ...
+ True
+ >>> len(queue)
+ 7
+ >>> j() is j
+ True
+ >>> len(queue)
+ 8
+ >>> queue.claim() is j
+ True
+
+commitError: "please reschedule later"
+
+ >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+ >>> j = queue.put(causeCommitError, retry_policy_factory=StubRetryPolicy)
+ >>> len(queue)
+ 8
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'causeCommitError')
+ ...
+ True
+ >>> len(queue)
+ 7
+ >>> j() is j
+ True
+ >>> len(queue)
+ 8
+ >>> queue[-2] is j
+ True
+ >>> j.begin_after
+ datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+
+ >>> transaction.TransactionManager.commit = old_commit
+
=========
Footnotes
=========
@@ -801,12 +1198,12 @@
Traceback (most recent call last):
...
KeyError: 'foo'
-
+
>>> container['foo'] = None
Traceback (most recent call last):
...
ValueError: value must be IQueue
-
+
>>> del container['']
>>> len(container)
0
Modified: zc.async/branches/dev/src/zc/async/testing.py
===================================================================
--- zc.async/branches/dev/src/zc/async/testing.py 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/testing.py 2008-06-17 02:38:06 UTC (rev 87444)
@@ -13,7 +13,9 @@
##############################################################################
import threading
import bisect
-import time
+from time import sleep as time_sleep # this import style is intentional, so
+# that test monkeypatching of time.sleep does not affect the usage in this
+# module
import datetime
import pytz
@@ -51,6 +53,9 @@
raw = super(_datetime, self).__repr__()
return "datetime.datetime%s" % (
raw[raw.index('('):],)
+ def __add__(self, other):
+ return _datetime(
+ *super(_datetime,self).__add__(other).__reduce__()[1])
def __reduce__(self):
return (argh, super(_datetime, self).__reduce__()[1])
def argh(*args, **kwargs):
@@ -103,7 +108,7 @@
assert _when == 'before' and _event == 'shutdown', (
'unsupported trigger')
self.triggers.append((_when, _event, _callable, args, kwargs))
-
+
def callInThread(self, _callable, *args, **kw):
# very naive should be fine...
thread = threading.Thread(target=_callable, args=args, kwargs=kw)
@@ -177,7 +182,7 @@
break
else:
break
- time.sleep(0.1)
+ time_sleep(0.1)
else:
print 'TIME OUT'
@@ -189,7 +194,7 @@
for i in range(seconds * 10):
if len(dispatcher.polls) > count:
return dispatcher.polls.first()
- time.sleep(0.1)
+ time_sleep(0.1)
else:
assert False, 'no poll!'
@@ -198,7 +203,7 @@
t = transaction.begin()
if job.status == zc.async.interfaces.COMPLETED:
return job.result
- time.sleep(0.1)
+ time_sleep(0.1)
else:
assert False, 'job never completed'
@@ -208,6 +213,6 @@
t = transaction.begin()
if name in job.annotations:
return job.annotations[name]
- time.sleep(0.1)
+ time_sleep(0.1)
else:
assert False, 'annotation never found'
Modified: zc.async/branches/dev/src/zc/async/utils.py
===================================================================
--- zc.async/branches/dev/src/zc/async/utils.py 2008-06-16 20:09:42 UTC (rev 87443)
+++ zc.async/branches/dev/src/zc/async/utils.py 2008-06-17 02:38:06 UTC (rev 87444)
@@ -14,11 +14,13 @@
import datetime
import logging
import sys
+import time
import ZEO.Exceptions
import ZODB.POSException
import rwproperty
import persistent
+import zope.minmax
import zc.dict
import pytz
import zope.bforest.periodic
@@ -58,39 +60,12 @@
def __parent__(self, value):
self._z_parent__ = None
+# for legacy databases
+Atom = zope.minmax.Maximum
-class Atom(persistent.Persistent):
- def __init__(self, value):
- self.value = value
- def __getstate__(self):
- return self.value
-
- def __setstate__(self, state):
- self.value = state
-
-class AtomDescriptor(object):
- def __init__(self, name, initial=None):
- self.name = name
- self.initial = initial
-
- def __get__(self, obj, klass=None):
- if obj is None:
- return self
- return obj.__dict__[self.name].value
-
- def __set__(self, obj, value):
- obj.__dict__[self.name].value = value
-
- def initialize(self, obj):
- obj.__dict__[self.name] = Atom(self.initial)
-
-def createAtom(name, initial):
- sys._getframe(1).f_locals[name] = AtomDescriptor(name, initial)
-
-
class Dict(zc.dict.Dict, Base):
-
+
copy = None # mask
def __setitem__(self, key, value):
@@ -264,7 +239,7 @@
if not trans_ct % 5:
log.warning(
'%d consecutive transaction errors while %s',
- ct, identifier, exc_info=True)
+ trans_ct, identifier, exc_info=True)
res = None
except EXPLOSIVE_ERRORS:
tm.abort()
@@ -277,11 +252,11 @@
tm.abort()
backoff_ct += 1
if backoff_ct == 1:
+ # import pdb; pdb.set_trace()
log.log(level,
'first error while %s; will continue in %d seconds',
identifier, backoff, exc_info=True)
- elif not backoff_ct % 5:
-
+ elif not backoff_ct % 10:
log.log(level,
'%d consecutive errors while %s; '
'will continue in %d seconds',
@@ -320,7 +295,7 @@
log.error('first error while %s; will continue in %d seconds',
identifier, backoff, exc_info=True)
elif not backoff_ct % 5:
-
+
log.error('%d consecutive errors while %s; '
'will continue in %d seconds',
backoff_ct, identifier, backoff, exc_info=True)
@@ -357,4 +332,4 @@
tm.abort()
log.error('Error while %s', identifier, exc_info=True)
res = zc.twist.Failure()
- return res
\ No newline at end of file
+ return res
More information about the Checkins
mailing list