[Checkins] SVN: zc.async/trunk/ svn merge -r 86847:87583
svn+ssh://svn.zope.org/repos/main/zc.async/branches/dev
Gary Poster
gary at zope.com
Thu Jun 19 21:18:18 EDT 2008
Log message for revision 87584:
svn merge -r 86847:87583 svn+ssh://svn.zope.org/repos/main/zc.async/branches/dev
Changed:
U zc.async/trunk/buildout.cfg
U zc.async/trunk/setup.py
U zc.async/trunk/src/zc/async/CHANGES.txt
U zc.async/trunk/src/zc/async/README.txt
U zc.async/trunk/src/zc/async/TODO.txt
U zc.async/trunk/src/zc/async/catastrophes.txt
U zc.async/trunk/src/zc/async/configure.py
U zc.async/trunk/src/zc/async/dispatcher.py
U zc.async/trunk/src/zc/async/dispatcher.txt
U zc.async/trunk/src/zc/async/interfaces.py
U zc.async/trunk/src/zc/async/job.py
U zc.async/trunk/src/zc/async/job.txt
U zc.async/trunk/src/zc/async/queue.py
U zc.async/trunk/src/zc/async/queue.txt
U zc.async/trunk/src/zc/async/testing.py
U zc.async/trunk/src/zc/async/utils.py
-=-
Modified: zc.async/trunk/buildout.cfg
===================================================================
--- zc.async/trunk/buildout.cfg 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/buildout.cfg 2008-06-20 01:18:18 UTC (rev 87584)
@@ -13,13 +13,13 @@
[test]
recipe = zc.recipe.testrunner
eggs = zc.async
-defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1'.split()
+defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1 --auto-color'.split()
working-directory = ${buildout:directory}
[z3test]
recipe = zc.recipe.testrunner
eggs = zc.async [z3]
-defaults = "--tests-pattern z3tests --exit-with-status -1".split()
+defaults = "--tests-pattern z3tests --exit-with-status -1 --auto-color".split()
[interpreter]
Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/setup.py 2008-06-20 01:18:18 UTC (rev 87584)
@@ -18,7 +18,6 @@
try:
import docutils
except ImportError:
- import warnings
def validateReST(text):
return ''
else:
@@ -95,15 +94,16 @@
'uuid',
'zc.queue',
'zc.dict>=1.2.1',
- 'zc.twist>=1.2',
+ 'zc.twist>=1.3',
'Twisted>=8.0.1', # 8.0 was setuptools compatible, 8.0.1 had bugfixes.
- # note that Twisted builds with warnings, at least with py2.4. It
+ # note that Twisted builds with warnings with py2.4. It
# seems to still build ok.
'zope.bforest>=1.2',
'zope.component',
'zope.event',
'zope.i18nmessageid',
'zope.interface',
+ 'zope.minmax',
'zope.testing',
'rwproperty',
],
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/CHANGES.txt 2008-06-20 01:18:18 UTC (rev 87584)
@@ -1,6 +1,58 @@
1.2 (unreleased)
================
+- made the log for finding an activated agent report the pertinent queue's oid
+ as an unpacked integer, rather than the packed string blob. Use
+ ``ZODB.utils.p64`` to convert back to an oid that the ZODB will recognize.
+
+- Bugfix: in failing a job, the job thought it was in its old agent, and the
+ ``fail`` call failed. This is now tested by the first example in new doctest
+ ``catastrophes.txt``.
+
+- jobs no longer default to a ``begin_by`` value of one hour after the
+ ``begin_after``. The default now is no limit.
+
+- Made dispatcher much more robust to transaction errors and ZEO
+ ClientDisconnected errors.
+
+- Jobs now use an IRetryPolicy to decide what to do on failure within a job,
+ within the commit of the result, and if the job is interrupted. This allows
+ support of transactional jobs, transactional jobs that critically must be
+ run to completion, and non-transactional jobs such as communicating with an
+ external service.
+
+- The default retry policy supports retries for ClientDisconnected errors,
+ transaction errors, and interruptions.
+
+- ``job.txt`` has been expanded significantly to show error handling and the
+ use of retry policies. New file ``catastrophes.txt`` shows handling of other
+ catastrophes, such as interruptions to polling.
+
+- job errors now go in the main zc.async.event log rather than in the
+ zc.async.trace log. Successes continue to go in the trace log.
+
+- callback failures go to the main log as a CRITICAL error, by default.
+
+- ``handleInterrupt`` is the new protocol on jobs to inform them that they were
+ active in a dispatcher that is now dead. They either fail or reschedule,
+ depending on the associated IRetryPolicy for the job. If they reschedule,
+ this should either be a datetime or timedelta. The job calls the agent's
+ ``reschedule`` method. If the timedelta is empty or negative, or the datetime
+ is earlier than now, the job is put back in the queue with a new ``putBack``
+ method on the queue. This is intended to be the opposite of ``claim``. Jobs
+ put in the queue with ``putBack`` will be pulled out before any others.
+
+- convert to using zope.minmax rather than locally defined ``Atom``.
+
+- Fix (and simplify) last_ping code so as to reduce unnecessarily writing the
+ state of the parent DispatcherAgents collection to the database whenever the
+ atom changed.
+
+- Depends on new release of zc.twist (1.3)
+
+1.1.1 (2008-05-14)
+==================
+
- more README tweaks.
- converted all reports from the dispatcher, including the monitor output,
@@ -21,15 +73,6 @@
Fixed, which also means we need a new version of zope.bforest (1.2) for a new
feature there.
-- made the log for finding an activated agent report the pertinent queue's oid
- as an unpacked integer, rather than the packed string blob. As mentioned
- above, use ``ZODB.utils.p64`` to convert back to an oid that the ZODB will
- recognize.
-
-- Bugfix: in failing a job, the job thought it was in its old agent, and the
- ``fail`` call failed. This is now tested by the first example in new doctest
- ``catastrophes.txt``.
-
1.1 (2008-04-24)
================
@@ -49,7 +92,7 @@
- Had the ThreadedDispatcherInstaller subscriber stash the thread on the
dispatcher, so you can shut down tests like this:
-
+
>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/README.txt 2008-06-20 01:18:18 UTC (rev 87584)
@@ -1087,15 +1087,16 @@
>>> t = transaction.begin()
>>> job = queue.put(
- ... send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
+ ... send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC),
+ ... datetime.timedelta(hours=1))
>>> transaction.commit()
>>> reactor.wait_for(job)
>>> job.result
- <zc.twist.Failure zc.async.interfaces.AbortedError>
+ <zc.twist.Failure zc.async.interfaces.TimeoutError>
>>> import sys
>>> job.result.printTraceback(sys.stdout) # doctest: +NORMALIZE_WHITESPACE
Traceback (most recent call last):
- Failure: zc.async.interfaces.AbortedError:
+ Failure: zc.async.interfaces.TimeoutError:
.. [#job] The Job class can take arguments and keyword arguments
for the wrapped callable at call time as well, similar to Python
Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/TODO.txt 2008-06-20 01:18:18 UTC (rev 87584)
@@ -1,15 +1,9 @@
-Bugs and improvements:
+For release
-- need retry tasks, particularly for callbacks
-
-- need CRITICAL logs for callbacks
-
-- when database went away, and then came back, async didn't come back.
-
- be even more pessimistic about memory for saved polls and job info in
dispatcher.
-For future versions:
+Bugs and improvements:
- queues should be pluggable like agent with filter
- show how to broadcast, maybe add conveniences
@@ -36,10 +30,17 @@
Notice that all of the tests in this package use FileStorage.
* callbacks should be very, very quick, and very reliable. If you want to do
something that might take a while, put another job in the queue
-
+More docs:
+
+- custom retry policies, particularly for non-transactional tasks;
+
+- changing the default retry policy, per-process and per-agent; and
+
+- changing the default log level for queue jobs, callback jobs, and per-job.
+
+
For some other package, maybe:
- TTW Management and logging views, as in zasync (see goals in the "History"
section of the README).
-
\ No newline at end of file
Modified: zc.async/trunk/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes.txt 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/catastrophes.txt 2008-06-20 01:18:18 UTC (rev 87584)
@@ -1,59 +1,485 @@
-Catastrophes
-============
+Recovering from Catastrophes
+============================
-Sometimes bad things happen in the course of processing tasks. You might have
-a MemoryError while processing your main job, or some other failure might
-happen. That's bad enough. Of course, you can register some callbacks to
-handle the error, to do what you need to recover.
+------------
+Introduction
+------------
-But then what if the callback itself fails? Perhaps the situation that caused
-the main job to fail with a MemoryError will let the callback start, but not
-complete. Then when a sibling dispatcher handles the incomplete job, the
-callback will fail.
+Sometimes bad things happen in the course of processing tasks. What might go
+wrong? How does zc.async handle these errors? What are your responsibilities?
-You, the user, do have some responsibilities. Callbacks should be very fast and
-light. If you want to do something that takes a long time, or might take a long
-time, have your callback put a new job in a queue for the long job. The
-callback itself should then complete, quickly out of the way.
+First, what might go wrong?
-However, zc.async also has important responsibilities.
+- zc.async could have a problem while polling for jobs. We'll call this a
+ "polling exception."
-This document examines catastrophes like the ones outlined above, to show how
-zc.async handles them, and how you can configure zc.async for these situations.
-Other documents in zc.async, such as the "Dead Dispatchers" section of
-queue.txt, look at this with some isolation and stubs; this document uses a
-complete zc.async set up to examine the system holistically [#setUp]_.
+- zc.async could have a problem while performing a particular job. We'll call
+ this a "job-related exception."
-These are the scenarios we'll contemplate:
+For the purpose of this discussion, we will omit the possibility that zc.async
+has a bug. That is certainly a possibility, but the recovery story is not
+predictable, and if we knew of a bug, we'd try to fix it, rather than discuss
+it here!
-- The system has a single dispatcher. The dispatcher is working on a job with a
- callback. The dispatcher dies, and then restarts, cleaning up. We'll do two
- variants, one with a graceful shutdown and one with a hard crash.
+We'll discuss both polling exceptions and job related exceptions, then drill
+down into some specific scenarios. This will illuminate how your code and
+zc.async's can work together to handle them.
-- The system has two dispatchers. One dispatcher is working on a job with a
- callback, and then dies. The other dispatcher cleans up.
+Polling Exceptions
+------------------
-- The system has a single dispatcher. The dispatcher is working on a job, and
- successfully completes it. The callback begins, and then fails.
+Polling exceptions are, at least in theory, the least of your worries. You
+shouldn't have to worry about them; and if you do, it is probably a basic
+configuration problem that you need to address, such as making sure that the
+dispatcher process has access to the needed databases and software; or making
+sure that the dispatcher process is run by a daemonizing software that will
+restart if needed, such as zdaemon (http://pypi.python.org/pypi/zdaemon) or
+supervisor (http://supervisord.org/).
-- The system has a single dispatcher. The dispatcher is working on a job, and
- successfully completes it. The callback begins, and then the dispatcher
- dies.
+zc.async is largely responsible for dealing with polling exceptions. What does
+it have to handle?
-- The system has a single dispatcher. The database goes away, and then comes
- back.
+- The process running the poll ends, perhaps in the middle of a poll.
--------------------------------------------------
-Dispatcher Dies Gracefully While Performing a Job
--------------------------------------------------
+- zc.async cannot commit a transaction during the poll, for instance because of
+ a ConflictError, or because the database is unavailable.
+What needs to happen to handle these problems?
+
+Process Ends
+............
+
+If the process ends, your daemonizing front-end (zdaemon, supervisor, etc.)
+needs to restart it. The ZODB will discard incomplete transaction data, if any.
+
+The only thing a zc.async dispatcher needs to handle is clean up.
+
+- Ideally it will be able to deactivate its record in the ZODB during the
+ process shutdown.
+
+- Instead, if it was a "hard crash" that didn't allow deactivation, a sibling
+ dispatcher will realize that the dispatcher is down and deactivate it.
+
+- Or, finally, if it was a hard crash without a sibling, and the daemon
+ restarts a process for the original dispatcher instance, the new process
+ needs to realize that the old process is dead, not competing with it.
+
+Transaction Error
+.................
+
+If the poll gets a conflict error, it should simply abort and retry the poll,
+forever, with a small back-off.
+
+If the database goes away (perhaps the ZEO server goes down for a bit, and the
+ZEO client to which the dispatcher is connected is trying to reconnect) it
+should gracefully try to wait for the database to return, and resume when it
+does.
+
+Other, more dramatic errors, such as POSKey errors, are generally considered to
+be out of zc.async's domain and control. It should ideally continue to try to
+resume as long as the process is alive, in case somehow the situation improves,
+but this may be difficult and the expectations for zc.async's recovery are
+lower than with ConflictErrors and ClientDisconnected errors.
+
+Summary of Polling Exceptions
+.............................
+
+To repeat, then, polling exceptions have two basic scenarios.
+
+If a dispatcher process ends, it needs to deactivate its record in the ZODB, or
+let another process know to deactivate it.
+
+If a ZODB.POSException.ConflictError occurs, retry forever with a small
+backoff; or if ZEO.Exceptions.ClientDisconnected occurs, retry forever with a
+small backoff, waiting for the database to come back.
+
+Most anything else will ideally keep zc.async attempting to re-poll, but it may
+not happen: expectations are lower.
+
+Job-Related Exceptions
+----------------------
+
+What about job-related exceptions? Responsibility for handling job-related
+exceptions is shared between your code and zc.async's. What might happen?
+
+- Your job might fail internally.
+
+- The process running your task ends before completing your task.
+
+- zc.async cannot commit a transaction after your task completes, for instance
+ because of a ConflictError, or because the database is unavailable.
+
+What should occur to handle these problems?
+
+Job Fails
+.........
+
+As discussed elsewhere, if your job fails in your own code, this is mostly
+your reponsibility. You should handle possible errors both within your job's
+code, and in callbacks, as appropriate.
+
+The other tool at your disposal for this situation, as with others below, is a
+retry policy. Retry policies let you determine what zc.async should do when
+your job fails. The default retry policy for job failures (as well as commit
+failures, below) is that transaction errors, such as conflict errors, are
+retried five times, and a ZEO ClientDisconnected error is retried forever with
+a backoff. You can customize these.
+
+Other than supporting these tools, zc.async's only other responsibilities are
+to report.
+
+By default, zc.async will log a failure of a job entered in a queue at the
+"ERROR" level in the ``zc.async.events`` log, and it will log a failure of a
+callback or other internal job at the "CRITICAL" level. This can be controlled
+per-process and per-job, as we'll see below. These tracebacks include
+information about the local and global variables for each frame in the stack,
+which can be useful to deduce the problem that occurred.
+
+zc.async also includes a ``Failure`` object on the job as a result, to let you
+react to the problem in a callback, and analyze it later. This is discussed in
+detail in other documents.
+
+Process Ends
+............
+
+If a process ends while it is performing a job, that is similar, in large part,
+to the possibility of the process ending the polling job: we need to restart
+the process, and realize that we had started the job. But should we restart the
+job, or abort it?
+
+Answering this question is a matter of policy, and requires knowing what each
+job does.
+
+Generally, if a job is fully transactional, such as writing something to the
+ZODB, and the job has not timed out yet, you'll want to restart it. You might
+want to restart some reasonably large number of times, and then suspect that,
+since you can't seem to finish the job, maybe the job is causing the process to
+die, and you should abort. Or perhaps you want to restart for ever.
+
+If the job isn't transactional, such as communicating with an external service,
+you might want to abort the job, and set up some callbacks to handle the
+fallout.
+
+As we'll see below, zc.async defaults to guessing that jobs placed directly in
+a queue are transactional, and can be tried up to ten times; and that jobs
+used as callbacks are also transactional, and can be tried until they
+succeed. The defaults can be changed and the behavior of an individual
+job can be changed.
+
+These settings are controlled with a RetryPolicy, discussed below.
+
+Transaction Error
+.................
+
+Handling transaction errors after processing a job is also similar to the
+handling of transaction errors for polling exceptions. ConflictErrors and
+ClientDisconnected errors should often cause jobs to be aborted and restarted.
+However, if the job is not transactional, such as communicating with an
+external service, a simple abourt and retry may be hazardous. Also, many jobs
+should be stopped if they retry on ConflictError more than some number of
+times--a heuristic bellweather--with the logic that they may be simply doing
+something too problematic, and they are blocking other tasks from starting. But
+other jobs should be retried until they complete.
+
+As mentioned above, zc.async defaults to guessing that jobs are transactional.
+Client Disconnected errors are retried forever, with a small backoff. Jobs
+placed in a queue retry transaction errors, such as ConflictErrors, four times,
+while callbacks retry them forever. The defaults can be changed and the
+behavior of an individual job can be changed, using the RetryPolicy described
+below.
+
+Summary of Job-Related Exceptions
+.................................
+
+If an exception occurs in your job's code, zc.async will log it as an ERROR
+if a main queue job and as CRITICAL if it is a callback; and it will make the
+result of the call a ``Failure`` with error information, as shown elsewhere.
+Everything else is your responsibility, to be handled with try:except or
+try:finally blocks in your code, callbacks, or custom RetryPolicies.
+
+Process death, conflict errors, and ``ClientDisconnected`` errors all may need
+to be handled differently for different jobs. zc.async has a default policy for
+jobs placed in a queue, and for callback jobs. The default policy, a
+RetryPolicy, can be changed and can be set explicitly per-job.
+
+Your Responsibilities
+---------------------
+
+As the author of a zc.async job, your responsibilities, then, are to handle
+your own exceptions; and to make sure that the retry policy for each job is
+appropriate. This is controlled with an IRetryPolicy, as shown below.
+
+As someone configuring a running dispatcher, you need to make sure that you
+give the dispatcher the necessary access to databases and software to perform
+your jobs, and you need to review (and rotate!) your logs.
+
+zc.async's Responsibilities
+---------------------------
+
+zc.async needs to have polling robust in the face of restarts, ConflictErrors
+and ClientDisconnected errors. It needs to give your code a chance to decide
+what to do in these circumstances, and log your errors.
+
+Retry Policies
+--------------
+
+The rest of the document uses scenarios to illustrate how zc.async handles
+errors, and how you might want to configure retry policies.
+
+What is a retry policy? It is used in three circumstances.
+
+- When the job starts but fails to complete because the system is interrupted,
+ the job will try to call ``retry_policy.interrupted()`` to get a boolean as
+ to whether the job should be retried.
+
+- When the code the job ran fails, the job will try to call
+ ``retry_policy.jobError(failure, data_cache)`` to get a boolean as to whether
+ the job should be retried.
+
+- When the commit fails, the job will try to call
+ ``retry_policy.commitError(failure, data_cache)`` to get a boolean as to
+ whether the job should be retried.
+
+Why does this need to be a policy? Can't it be a simpler arrangement?
+
+The heart of the problem is that different jobs need different error
+resolutions.
+
+In some cases, jobs may not be fully transactional. For instance, the job
+may be communicating with an external system, such as a credit card system.
+The retry policy here should typically be "never": perhaps a callback should be
+in charge of determining what to do next.
+
+If a job is fully transactional, it can be retried. But even then the desired
+behavior may differ.
+
+- In typical cases, some errors should simply cause a failure, while other
+ errors, such as database conflict errors, should cause a limited number of
+ retries.
+
+- In some jobs, conflict errors should be retried forever, because the job must
+ be run to completion or else the system should fall over. Callbacks that try
+ to handle errors themselves may take this approach, for instance.
+
+zc.async currently ships with three retry policies.
+
+1. The default, appropriate for most fully transactional jobs, is the
+ zc.async.job.RetryCommonFourTimes.
+
+2. The other available (pre-written) option for transactional jobs is
+ zc.async.job.RetryCommonForever. Callbacks will get this policy by
+ default.
+
+Both of these policies retry ZEO disconnects forever; and interrupts and
+transaction errors such as conflicts either four times (for a total of five
+attempts) or forever, respectively.
+
+3. The last retry policy is zc.async.job.NeverRetry. This is appropriate for
+ non-transactional jobs. You'll still typically need to handle errors in
+ your callbacks.
+
+Scenarios
+---------
+
+We'll examine polling error scenarios and job error scenarios.
+
+- Polling errors
+
+ * The system is polling and gets a ConflictError.
+
+ * The system is polling and gets a ClientDisconnected error.
+
+- Job errors
+
+ * A worker process is working on a job with the default retry policy. The
+ process dies gracefully and restarts.
+
+ * Like the previous scenario, a worker process is working on a job with the
+ default retry policy. The process crashes hard (does not die gracefully)
+ and restarts.
+
+ * Like the previous scenario, a worker process is working on a job with the
+ default retry policy. The process crashes hard (does not die gracefully)
+ and a sibling notices and takes over.
+
+ * A worker process is working on a job with the default retry policy and gets
+ an error during the job or the commit.
+
+-------------------------
+Scenarios: Polling Errors
+-------------------------
+
+ConflictError
+-------------
+
+A common place for a conflict error is with two dispatchers trying to claim the
+same job from the queue. This example will mimic that situation.
+
+Imagine we have a full set up with a dispatcher, agent, and queue. [#setUp]_
+We'll actually replace the agent's chooser with one that behaves badly: it
+blocks, waiting for our lock.
+
+ >>> import threading
+ >>> lock1 = threading.Lock()
+ >>> lock2 = threading.Lock()
+ >>> lock1.acquire()
+ True
+ >>> lock2.acquire()
+ True
+ >>> def acquireLockAndChooseFirst(agent):
+ ... res = agent.queue.claim()
+ ... if res is not None:
+ ... lock2.release()
+ ... lock1.acquire()
+ ... return res
+ ...
+ >>> import zc.async.instanceuuid
+ >>> import zc.async.interfaces
+ >>> import zc.async.testing
+ >>> import zc.async.dispatcher
+ >>> import pprint
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> pprint.pprint(zc.async.testing.get_poll(dispatcher, 0))
+ {'': {'main': {'active jobs': [],
+ 'error': None,
+ 'len': 0,
+ 'new jobs': [],
+ 'size': 3}}}
+ >>> import transaction
+ >>> _ = transaction.begin()
+ >>> queues = root[zc.async.interfaces.KEY]
+ >>> queue = queues['']
+ >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+ >>> agent = da['main']
+ >>> agent.chooser = acquireLockAndChooseFirst
+ >>> def returnSomething():
+ ... return 42
+ ...
+ >>> job = queue.put(returnSomething)
+ >>> transaction.commit()
+
+Now, when the agent tries to get our job, we'll start and commit another
+transaction that removes it from the queue. This will generate a conflict
+error for the poll's thread and transaction, because it cannot also remove the
+same job.
+
+ >>> lock2.acquire()
+ True
+ >>> _ = transaction.begin()
+ >>> job is queue.pull()
+ True
+ >>> transaction.commit()
+ >>> lock1.release()
+
+However, the ConflictError is handled, and polling continues.
+
+ >>> _ = transaction.begin()
+ >>> import zc.async.agent
+ >>> agent.chooser = zc.async.agent.chooseFirst
+ >>> transaction.commit()
+ >>> import zc.async.dispatcher
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> import zc.async.testing
+ >>> pprint.pprint(zc.async.testing.get_poll(dispatcher))
+ {'': {'main': {'active jobs': [],
+ 'error': None,
+ 'len': 0,
+ 'new jobs': [],
+ 'size': 3}}}
+
+And if we put the job back, it will be performed.
+
+ >>> job is queue.put(job)
+ True
+ >>> transaction.commit()
+ >>> zc.async.testing.wait_for_result(job)
+ 42
+
+Client Disconnected
+-------------------
+
+The story is very similar if the ZEO connection goes away for a while. We'll
+mimic a ZEO ClientDisconnected error by monkeypatching
+transaction.TranasctionManager.commit.
+
+ >>> lock1.locked()
+ True
+ >>> lock2.locked()
+ True
+
+ >>> agent.chooser = acquireLockAndChooseFirst
+ >>> job = queue.put(returnSomething)
+ >>> transaction.commit()
+
+ >>> lock2.acquire()
+ True
+ >>> import ZEO.Exceptions
+ >>> def commit(self):
+ ... raise ZEO.Exceptions.ClientDisconnected()
+ ...
+ >>> import transaction
+ >>> old_commit = transaction.TransactionManager.commit
+ >>> transaction.TransactionManager.commit = commit
+ >>> import time
+ >>> sleep_requests = []
+ >>> def sleep(i):
+ ... sleep_requests.append(i)
+ ...
+ >>> old_sleep = time.sleep
+ >>> time.sleep = sleep
+ >>> agent.chooser = zc.async.agent.chooseFirst
+ >>> transaction.commit()
+ >>> lock1.release()
+ >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
+ {'': {'main': {'active jobs': [],
+ 'error': None,
+ 'len': 0,
+ 'new jobs': [(..., 'unnamed')],
+ 'size': 3}}}
+ >>> transaction.TransactionManager.commit = old_commit
+ >>> zc.async.testing.wait_for_result(job)
+ 42
+ >>> bool(sleep_requests)
+ True
+
+Here's another variant that mimics being unable to read the storage during a
+poll, and then recuperating.
+
+ >>> error_raised = False
+ >>> def raiseDisconnectedThenChooseFirst(agent):
+ ... global error_raised
+ ... if not error_raised:
+ ... error_raised = True
+ ... raise ZEO.Exceptions.ClientDisconnected()
+ ... return agent.queue.claim()
+ >>> agent.chooser = raiseDisconnectedThenChooseFirst
+ >>> def returnSomething():
+ ... return 42
+ ...
+ >>> job = queue.put(returnSomething)
+ >>> transaction.commit()
+ >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
+ {'': {'main': {'active jobs': [],
+ 'error': <zc.twist.Failure ...ClientDisconnected>,
+ 'len': 0,
+ 'new jobs': [],
+ 'size': 3}}}
+ >>> zc.async.testing.wait_for_result(job)
+ 42
+
+-----------------------------
+Scenarios: Job-Related Errors
+-----------------------------
+
+Graceful Shutdown During Job
+----------------------------
+
First let's consider how a failed job with a callback or two is handled when
the dispatcher dies.
Here we start a job.
>>> import zope.component
- >>> import threading
>>> import transaction
>>> import zc.async.interfaces
>>> import zc.async.testing
@@ -63,29 +489,35 @@
>>> lock = threading.Lock()
>>> lock.acquire()
True
+ >>> fail_flag = True
>>> def wait_for_me():
- ... lock.acquire()
- ... lock.release() # so we can use the same lock again later
- ... raise SystemExit() # this will cause the worker thread to exit
+ ... global fail_flag
+ ... if fail_flag:
+ ... fail_flag = False
+ ... lock.acquire()
+ ... lock.release() # so we can use the same lock again later
+ ... raise SystemExit() # this will cause the worker thread to exit
+ ... else:
+ ... return 42
...
- >>> def handle_error(result):
- ... return '...I handled the error...'
+ >>> def handle_result(result):
+ ... return 'I got result %r' % (result,)
...
>>> job = queue.put(wait_for_me)
- >>> callback_job = job.addCallbacks(failure=handle_error)
+ >>> callback_job = job.addCallback(handle_result)
>>> transaction.commit()
>>> dispatcher = zc.async.dispatcher.get()
>>> poll = zc.async.testing.get_poll(dispatcher)
>>> wait_for_start(job)
-In this scenario, ``wait_for_me`` is a job that will "unexpectedly" be lost
-while the dispatcher stops working. ``handle_error`` is the hypothetical
-handler that should be called if the ``wait_for_me`` job fails.
+In this scenario, ``wait_for_me`` is a job that, the first time it is run, will
+"unexpectedly" be lost while the dispatcher stops working. ``handle_result``
+will simply show us that callbacks will be called successfully.
The job has started. Now, the dispatcher suddenly dies without the thread
performing ``wait_for_me`` getting a chance to finish. For our first example,
let's give the dispatcher a graceful exit. The dispatcher gets a chance to
-clean up its dispatcher agents, and job.fail() goes into the queue.
+clean up its dispatcher agents, and job.handleInterrupt() goes into the queue.
>>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
>>> wait_to_deactivate(dispatcher)
@@ -94,37 +526,44 @@
True
>>> len(queue)
1
- >>> fail_job = queue[0]
- >>> fail_job
- <zc.async.job.Job (oid 51, db 'unnamed') ``zc.async.job.Job (oid 30, db 'unnamed') :fail()``>
- >>> queue[0].callable
- <bound method Job.fail of <zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.wait_for_me()``>>
+ >>> interrupt_job = queue[0]
+ >>> interrupt_job # doctest: +ELLIPSIS
+ <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+ >>> queue[0].callable # doctest: +ELLIPSIS
+ <bound method Job.handleInterrupt of <...Job ... ``...wait_for_me()``>>
-Now when the process starts back up again, our callback will be performed.
+Now when the process starts back up again, ``handleInterrupt`` checks with the
+default retry policy as to what should be done. It requests that the job be
+retried. It's put back in the queue, and it is called again normally.
>>> old_dispatcher = dispatcher
>>> zc.async.dispatcher.clear()
>>> zc.async.subscribers.ThreadedDispatcherInstaller(
... poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
>>> dispatcher = zc.async.dispatcher.get()
- >>> zc.async.testing.wait_for_result(fail_job)
- >>> job.status == zc.async.interfaces.COMPLETED
- True
- >>> job.result
- <zc.twist.Failure zc.async.interfaces.AbortedError>
+ >>> zc.async.testing.wait_for_result(interrupt_job)
+
+Now we need to wait for the job.
+
+ >>> zc.async.testing.wait_for_result(job)
+ 42
>>> callback_job.status == zc.async.interfaces.COMPLETED
True
>>> callback_job.result
- '...I handled the error...'
+ 'I got result 42'
-So, our callback had a chance to do whatever it thought appropriate--in this
-case, simply returning a string--once the dispatcher got back online
-[#cleanup1]_.
+The job now has a retry policy with some currently non-interface values that
+are still worth showing here.
-------------------------------------------------
-Dispatcher Crashes "Hard" While Performing a Job
-------------------------------------------------
+ >>> policy = job.getRetryPolicy()
+ >>> policy.data.get('interruptions')
+ 1
+This shows that the policy registered one interruption. [#cleanup1]_
+
+Hard Crash During Job
+---------------------
+
Our next catastrophe only changes one aspect to the previous one: the
dispatcher does not stop gracefully, and does not have a chance to clean up its
active jobs. It is a "hard" crash.
@@ -136,8 +575,9 @@
>>> lock.acquire()
True
+ >>> fail_flag = True
>>> job = queue.put(wait_for_me)
- >>> callback_job = job.addCallbacks(failure=handle_error)
+ >>> callback_job = job.addCallback(handle_result)
>>> transaction.commit()
>>> dispatcher = zc.async.dispatcher.get()
>>> poll = zc.async.testing.get_poll(dispatcher)
@@ -156,11 +596,12 @@
where ``UUID`` is the uuid of that dispatcher.
The ``DispatcherAgents`` object has four pertinent attributes:
-``ping_interval``, ``ping_death_interval``, ``last_ping``, and ``dead``. About
-every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is supposed
-to write a ``datetime`` to ``last_ping``. If the ``last_ping`` plus the
-``ping_death_interval`` (also a ``timedelta``) is older than now, the
-dispatcher is considered to be ``dead``, and old jobs should be cleaned up.
+``ping_interval``, ``ping_death_interval``, ``last_ping.value``, and ``dead``.
+About every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is
+supposed to write a ``datetime`` to ``last_ping.value``. If the
+``last_ping.value`` plus the ``ping_death_interval`` (also a ``timedelta``) is
+older than now, the dispatcher is considered to be ``dead``, and old jobs
+should be cleaned up.
The ``ping_interval`` defaults to 30 seconds, and the ``ping_death_interval``
defaults to 60 seconds. Generally, the ``ping_death_interval`` should be at
@@ -221,13 +662,16 @@
``zc.async.dispatcher.get().activated = False``. To stop polls
permanently, don't start a zc.async.dispatcher!)
-
To speed up the realization of our dispatcher that the previous activation is
``dead``, we'll set the ping_death_interval to just one second.
>>> _ = transaction.begin()
>>> da.dead
False
+ >>> job in da['main']
+ True
+ >>> len(queue)
+ 0
>>> import datetime
>>> da.ping_death_interval = datetime.timedelta(seconds=1)
>>> da.dead
@@ -237,41 +681,48 @@
>>> transaction.commit()
After the next poll, the dispatcher will have cleaned up its old tasks in the
-same way we saw in the previous example. The job's ``fail`` method will be
-called, and the callback will be performed. The DispatcherAgents object is
-no longer dead, because it is tied to the new instance of the dispatcher.
+same way we saw in the previous example. The job's ``handleInterrupt`` method
+will be called, and the job will be put back in the queue to be retried. The
+DispatcherAgents object is no longer dead, because it is tied to the new
+instance of the dispatcher.
>>> poll = zc.async.testing.get_poll(dispatcher)
- >>> _ = transaction.begin()
+ >>> def wait_for_pending(job):
+ ... for i in range(60):
+ ... t = transaction.begin()
+ ... if job.status in (zc.async.interfaces.PENDING):
+ ... break
+ ... time_sleep(0.1)
+ ... else:
+ ... assert False, 'job never pending'
+ >>> wait_for_pending(job)
>>> job in da['main']
False
>>> bool(da.activated)
True
>>> da.dead
False
- >>> fail_job = job.parent
- >>> fail_job
- <zc.async.job.Job (oid 78, db 'unnamed') ``zc.async.job.Job (oid 57, db 'unnamed') :fail()``>
+ >>> queue[0] is job
+ True
-Let's see it happen.
+Now we need to wait for the job.
- >>> zc.async.testing.wait_for_result(fail_job)
- >>> job.status == zc.async.interfaces.COMPLETED
- True
- >>> job.result
- <zc.twist.Failure zc.async.interfaces.AbortedError>
+ >>> zc.async.testing.wait_for_result(job)
+ 42
>>> callback_job.status == zc.async.interfaces.COMPLETED
True
>>> callback_job.result
- '...I handled the error...'
+ 'I got result 42'
+ >>> policy = job.getRetryPolicy()
+ >>> policy.data.get('interruptions')
+ 1
The dispatcher cleaned up its own "hard" crash.
[#cleanup2]_
------------------------------------------------------------------
-Dispatcher Crashes "Hard" While Performing a Job, Sibling Resumes
------------------------------------------------------------------
+Hard Crash During Job with Sibling Recovery
+-------------------------------------------
Our next catastrophe is the same as the one before, except, after one
dispatcher's hard crash, another dispatcher is around to clean up the dead
@@ -285,8 +736,9 @@
>>> lock.acquire()
True
+ >>> fail_flag = True
>>> job = queue.put(wait_for_me)
- >>> callback_job = job.addCallbacks(failure=handle_error)
+ >>> callback_job = job.addCallback(handle_result)
>>> transaction.commit()
>>> dispatcher = zc.async.dispatcher.get()
>>> poll = zc.async.testing.get_poll(dispatcher)
@@ -359,8 +811,8 @@
After the second dispatcher gets a poll--a chance to notice--it will have
cleaned up the first dispatcher's old tasks in the same way we saw in the
-previous example. The job's ``fail`` method will be called, and the callback
-will be performed.
+previous example. The job's ``handleInterrupt`` method will be called, which
+in this case will put it back in the queue to be claimed and performed.
>>> alt_poll_3 = zc.async.testing.get_poll(alt_dispatcher)
>>> _ = transaction.begin()
@@ -370,48 +822,31 @@
False
>>> da.dead
True
- >>> fail_job = job.parent
- >>> fail_job
- <zc.async.job.Job (oid 121, db 'unnamed') ``zc.async.job.Job (oid 84, db 'unnamed') :fail()``>
- >>> zc.async.testing.wait_for_result(fail_job)
- >>> job.status == zc.async.interfaces.COMPLETED
+ >>> wait_for_pending(job)
+ >>> queue[0] is job
True
- >>> job.result
- <zc.twist.Failure zc.async.interfaces.AbortedError>
+
+Now we need to wait for the job.
+
+ >>> zc.async.testing.wait_for_result(job)
+ 42
>>> callback_job.status == zc.async.interfaces.COMPLETED
True
>>> callback_job.result
- '...I handled the error...'
+ 'I got result 42'
The sibling, then, was able to clean up the mess left by the "hard" crash of
the first dispatcher.
[#cleanup3]_
---------------
-Callback Fails
---------------
+Other Job-Related Errors
+------------------------
--------------------------------
-Dispatcher Dies During Callback
--------------------------------
+Other problems--errors when performing or committing jobs--are handled within
+jobs, getting the decisions from retry policies as described above. These
+are demonstrated in the job.txt document.
-------------------------------
-Database Disappears For Awhile
-------------------------------
-
----------------------------------------------
-Other Catastrophes, And Your Responsibilities
----------------------------------------------
-
-There are some catastrophes from which there are no easy fixes like these. For
-instance, imagine you have communicated with an external system, and gotten a
-reply that you have successfully made a transaction there, but then the
-dispatcher dies, or the database disappears, before you have a chance to commit
-the local transaction recording the success. Your code needs to see that
-
-...multidatabase...
-
.. ......... ..
.. Footnotes ..
.. ......... ..
@@ -421,8 +856,8 @@
>>> import ZODB.FileStorage
>>> storage = ZODB.FileStorage.FileStorage(
... 'main.fs', create=True)
- >>> from ZODB.DB import DB
- >>> db = DB(storage)
+ >>> from ZODB.DB import DB
+ >>> db = DB(storage)
>>> conn = db.open()
>>> root = conn.root()
>>> import zc.async.configure
@@ -440,13 +875,14 @@
>>> import transaction
>>> _ = transaction.begin()
- >>> import time
+ >>> from time import sleep as time_sleep # we import in this manner so
+ ... # that our testing monkey-patch of time.sleep does not affect our tests
>>> def wait_for_start(job):
... for i in range(60):
... t = transaction.begin()
... if job.status == zc.async.interfaces.ACTIVE:
... break
- ... time.sleep(0.1)
+ ... time_sleep(0.1)
... else:
... assert False, 'job never started'
@@ -454,7 +890,7 @@
... for i in range(60):
... if dispatcher.activated == False:
... break
- ... time.sleep(0.1)
+ ... time_sleep(0.1)
... else:
... assert False, 'dispatcher never deactivated'
@@ -490,3 +926,4 @@
>>> alt_dispatcher.reactor.callFromThread(alt_dispatcher.reactor.stop)
>>> alt_dispatcher.thread.join(3)
>>> alt_dispatcher.dead_pools[0].threads[0].join(3)
+ >>> time.sleep = old_sleep
Modified: zc.async/trunk/src/zc/async/configure.py
===================================================================
--- zc.async/trunk/src/zc/async/configure.py 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/configure.py 2008-06-20 01:18:18 UTC (rev 87584)
@@ -56,4 +56,4 @@
def base():
# see comment in ``minimal``, above
minimal()
- zope.component.provideAdapter(zc.twist.connection)
\ No newline at end of file
+ zope.component.provideAdapter(zc.twist.connection)
Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/dispatcher.py 2008-06-20 01:18:18 UTC (rev 87584)
@@ -21,6 +21,7 @@
import twisted.python.failure
import twisted.internet.defer
import ZODB.POSException
+import ZEO.Exceptions
import ZODB.utils
import BTrees
import transaction
@@ -55,7 +56,7 @@
def __init__(self):
self._event = threading.Event()
-
+
def setResult(self, value):
self.result = value
self._event.set()
@@ -119,6 +120,9 @@
class AgentThreadPool(object):
_size = 0
+ initial_backoff = 5
+ incremental_backoff = 5
+ maximum_backoff = 60
def __init__(self, dispatcher, name, size):
self.dispatcher = dispatcher
@@ -134,34 +138,54 @@
local.dispatcher = self.dispatcher
conn = self.dispatcher.db.open()
try:
- job = self.queue.get()
- while job is not None:
- identifier, dbname, info = job
+ job_info = self.queue.get()
+ while job_info is not None:
+ identifier, dbname, info = job_info
info['thread'] = thread.get_ident()
info['started'] = datetime.datetime.utcnow()
zc.async.utils.tracelog.info(
'starting in thread %d: %s',
info['thread'], info['call'])
+ backoff = self.initial_backoff
+ conflict_retry_count = 0
try:
- transaction.begin()
- if dbname is None:
- local_conn = conn
- else:
- local_conn = conn.get_connection(dbname)
- job = local_conn.get(identifier)
- local.job = job
+ while 1:
+ try:
+ transaction.begin()
+ if dbname is None:
+ local_conn = conn
+ else:
+ local_conn = conn.get_connection(dbname)
+ job = local_conn.get(identifier)
+ # this setstate should trigger any initial problems
+ # within the try/except retry structure here.
+ local_conn.setstate(job)
+ local.job = job
+ except ZEO.Exceptions.ClientDisconnected:
+ zc.async.utils.log.info(
+ 'ZEO client disconnected while trying to '
+ 'get job %d in db %s; retrying in %d seconds',
+ ZODB.utils.u64(identifier), dbname or '',
+ backoff)
+ time.sleep(backoff)
+ backoff = min(self.maximum_backoff,
+ backoff + self.incremental_backoff)
+ except ZODB.POSException.TransactionError:
+ # continue, i.e., try again
+ conflict_retry_count += 1
+ if (conflict_retry_count == 1 or
+ not conflict_retry_count % 5):
+ zc.async.utils.log.warning(
+ '%d transaction error(s) while trying to '
+ 'get job %d in db %s',
+ conflict_retry_count,
+ ZODB.utils.u64(identifier), dbname or '',
+ exc_info=True)
+ # now ``while 1`` loop will continue, to retry
+ else:
+ break
try:
job() # this does the committing and retrying, largely
- except ZODB.POSException.TransactionError:
- transaction.abort()
- while 1:
- job.fail()
- try:
- transaction.commit()
- except ZODB.POSException.TransactionError:
- transaction.abort() # retry forever (!)
- else:
- break
except zc.async.interfaces.BadStatusError:
transaction.abort()
zc.async.utils.log.error( # notice, not tracelog
@@ -169,6 +193,7 @@
if job.status == zc.async.interfaces.CALLBACKS:
job.resumeCallbacks() # moves the job off the agent
else:
+ count = 0
while 1:
status = job.status
if status == zc.async.interfaces.COMPLETED:
@@ -180,17 +205,38 @@
job.fail() # moves the job off the agent
try:
transaction.commit()
- except ZODB.POSException.TransactionError:
+ except (ZODB.POSException.TransactionError,
+ ZODB.POSException.POSError):
+ if count and not count % 10:
+ zc.async.utils.log.critical(
+ 'frequent database errors! '
+ 'I retry forever...',
+ exc_info=True)
+ time.sleep(1)
transaction.abort() # retry forever (!)
else:
break
+ except zc.async.utils.EXPLOSIVE_ERRORS:
+ transaction.abort()
+ raise
+ except:
+ # all errors should have been handled by the job at
+ # this point, so anything other than BadStatusError,
+ # SystemExit and KeyboardInterrupt are bad surprises.
+ transaction.abort()
+ zc.async.utils.log.critical(
+ 'unexpected error', exc_info=True)
+ raise
# should come before 'completed' for threading dance
if isinstance(job.result, twisted.python.failure.Failure):
info['failed'] = True
info['result'] = job.result.getTraceback(
- elideFrameworkCode=True, detail='verbose')
+ elideFrameworkCode=True)
else:
info['result'] = repr(job.result)
+ if len(info['result']) > 10000:
+ info['result'] = (
+ info['result'][:10000] + '\n[...TRUNCATED...]')
info['completed'] = datetime.datetime.utcnow()
finally:
local.job = None
@@ -198,14 +244,14 @@
zc.async.utils.tracelog.info(
'completed in thread %d: %s',
info['thread'], info['call'])
- job = self.queue.get()
+ job_info = self.queue.get()
finally:
conn.close()
if self.dispatcher.activated:
# this may cause some bouncing, but we don't ever want to end
# up with fewer than needed.
self.dispatcher.reactor.callFromThread(self.setSize)
-
+
def setSize(self, size=None):
# this should only be called from the thread in which the reactor runs
# (otherwise it needs locks)
@@ -233,7 +279,7 @@
self.queue.put(None)
return size - old # size difference
-# this is mostly for testing
+# this is mostly for testing, though ``get`` comes in handy generally
_dispatchers = {}
@@ -249,6 +295,8 @@
clear = _dispatchers.clear
+# end of testing bits
+
class Dispatcher(object):
activated = False
@@ -292,57 +340,22 @@
self.dead_pools = []
def _getJob(self, agent):
- try:
- job = agent.claimJob()
- except zc.twist.EXPLOSIVE_ERRORS:
- transaction.abort()
- raise
- except:
- transaction.abort()
- zc.async.utils.log.error(
- 'Error trying to get job for UUID %s from '
- 'agent %s (oid %s) in queue %s (oid %s)',
- self.UUID, agent.name, agent._p_oid,
- agent.queue.name,
- agent.queue._p_oid, exc_info=True)
- return zc.twist.Failure()
- res = self._commit(
- 'Error trying to commit getting a job for UUID %s from '
- 'agent %s (oid %s) in queue %s (oid %s)' % (
- self.UUID, agent.name, agent._p_oid,
- agent.queue.name,
- agent.queue._p_oid))
- if res is None:
- # Successful commit
- res = job
+ identifier = (
+ 'getting job for UUID %s from agent %s (oid %d) '
+ 'in queue %s (oid %d)' % (
+ self.UUID, agent.name, ZODB.utils.u64(agent._p_oid),
+ agent.queue.name, ZODB.utils.u64(agent.queue._p_oid)))
+ res = zc.async.utils.try_transaction_five_times(
+ agent.claimJob, identifier, transaction)
+ if isinstance(res, twisted.python.failure.Failure):
+ identifier = 'stashing failure on agent %s (oid %s)' % (
+ agent.name, ZODB.utils.u64(agent._p_oid))
+ def setFailure():
+ agent.failure = res
+ zc.async.utils.try_transaction_five_times(
+ setFailure, identifier, transaction)
return res
- def _commit(self, debug_string=''):
- retry = 0
- while 1:
- try:
- transaction.commit()
- except ZODB.POSException.TransactionError:
- transaction.abort()
- if retry >= 5:
- zc.async.utils.log.error(
- 'Repeated transaction error trying to commit in '
- 'zc.async: %s',
- debug_string, exc_info=True)
- return zc.twist.Failure()
- retry += 1
- except zc.twist.EXPLOSIVE_ERRORS:
- transaction.abort()
- raise
- except:
- transaction.abort()
- zc.async.utils.log.error(
- 'Error trying to commit: %s',
- debug_string, exc_info=True)
- return zc.twist.Failure()
- else:
- break
-
def poll(self):
poll_info = PollInfo()
started_jobs = []
@@ -358,29 +371,30 @@
queue.dispatchers.register(self.UUID)
da = queue.dispatchers[self.UUID]
if queue._p_oid not in self._activated:
- if da.activated:
- if da.dead:
- da.deactivate()
- else:
- zc.async.utils.log.error(
- 'UUID %s already activated in queue %s '
- '(oid %d): another process? (To stop '
- 'poll attempts in this process, set '
- '``zc.async.dispatcher.get().activated = '
- "False``. To stop polls permanently, don't "
- 'start a zc.async.dispatcher!)',
- self.UUID, queue.name,
- ZODB.utils.u64(queue._p_oid))
- continue
- da.activate()
- self._activated.add(queue._p_oid)
- # removed below if transaction fails
- res = self._commit(
- 'Error trying to commit activation of UUID %s in '
- 'queue %s (oid %s)' % (
- self.UUID, queue.name, queue._p_oid))
- if res is not None:
- self._activated.remove(queue._p_oid)
+ identifier = (
+ 'activating dispatcher UUID %s in queue %s (oid %d)' %
+ (self.UUID, queue.name, ZODB.utils.u64(queue._p_oid)))
+ def activate():
+ if da.activated:
+ if da.dead:
+ da.deactivate()
+ else:
+ zc.async.utils.log.error(
+ 'UUID %s already activated in queue %s '
+ '(oid %d): another process? (To stop '
+ 'poll attempts in this process, set '
+ '``zc.async.dispatcher.get().activated = '
+ "False``. To stop polls permanently, don't "
+ 'start a zc.async.dispatcher!)',
+ self.UUID, queue.name,
+ ZODB.utils.u64(queue._p_oid))
+ return False
+ da.activate()
+ return True
+ if zc.async.utils.try_transaction_five_times(
+ activate, identifier, transaction) is True:
+ self._activated.add(queue._p_oid)
+ else:
continue
queue_info = poll_info[queue.name] = {}
pools = self.queues.get(queue.name)
@@ -398,7 +412,7 @@
try:
agent_info['size'] = agent.size
agent_info['len'] = len(agent)
- except zc.twist.EXPLOSIVE_ERRORS:
+ except zc.async.utils.EXPLOSIVE_ERRORS:
raise
except:
agent_info['error'] = zc.twist.Failure()
@@ -419,17 +433,6 @@
if isinstance(job, twisted.python.failure.Failure):
agent_info['error'] = job
job = None
- try:
- agent.failure = res
- except zc.twist.EXPLOSIVE_ERRORS:
- raise
- except:
- transaction.abort()
- zc.async.utils.log.error(
- 'error trying to stash failure on agent')
- else:
- # TODO improve msg
- self._commit('trying to stash failure on agent')
else:
info = {'result': None,
'failed': False,
@@ -448,8 +451,10 @@
pool.queue.put(
(job._p_oid, dbname, info))
job = self._getJob(agent)
- queue.dispatchers.ping(self.UUID)
- self._commit('trying to commit ping')
+ identifier = 'committing ping for UUID %s' % (self.UUID,)
+ zc.async.utils.try_transaction_five_times(
+ lambda: queue.dispatchers.ping(self.UUID), identifier,
+ transaction)
if len(pools) > len(queue_info):
conn_delta = 0
for name, pool in pools.items():
@@ -539,13 +544,16 @@
try:
transaction.begin()
try:
- queues = self.conn.root().get(zc.async.interfaces.KEY)
- if queues is not None:
- for queue in queues.values():
- da = queue.dispatchers.get(self.UUID)
- if da is not None and da.activated:
- da.deactivate()
- self._commit('trying to tear down')
+ identifier = 'cleanly deactivating UUID %s' % (self.UUID,)
+ def deactivate_das():
+ queues = self.conn.root().get(zc.async.interfaces.KEY)
+ if queues is not None:
+ for queue in queues.values():
+ da = queue.dispatchers.get(self.UUID)
+ if da is not None and da.activated:
+ da.deactivate()
+ zc.async.utils.try_transaction_five_times(
+ deactivate_das, identifier, transaction)
finally:
transaction.abort()
self.conn.close()
Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/dispatcher.txt 2008-06-20 01:18:18 UTC (rev 87584)
@@ -8,29 +8,29 @@
must have several methods:
class IReactor(zope.interface.Interface):
-
+
def callFromThread(callable, *args, **kw):
"""have callable run in reactor's thread, by reactor, ASAP.
-
+
Intended to be called from a thread other than the reactor's main
loop.
"""
-
+
def callInThread(callable, *args, **kw):
"""have callable run in a separate thread, ASAP.
-
+
Must be called in same thread as reactor's main loop.
"""
-
+
def callLater(seconds, callable, *args, **kw):
"""have callable run in reactor at least <seconds> from now
-
+
Must be called in same thread as reactor's main loop.
"""
def addSystemEventTrigger(phase, event, callable, *args, **kw):
"""Install a callable to be run in phase of event.
-
+
must support phase 'before', and event 'shutdown'.
"""
@@ -140,7 +140,7 @@
- The queue fired events to announce the dispatcher's registration and
activation. We could have registered subscribers for either or both
of these events to create agents.
-
+
Note that the dispatcher in queue.dispatchers is a persistent
representative of the actual dispatcher: they are different objects.
@@ -161,17 +161,17 @@
True
- The dispatcher made its first ping. A ping means that the dispatcher changes
- a datetime to record that it is alive.
+ a datetime to record that it is alive.
- >>> queue.dispatchers[dispatcher.UUID].last_ping is not None
+ >>> queue.dispatchers[dispatcher.UUID].last_ping.value is not None
True
- The dispatcher needs to update its last_ping after every ``ping_interval``
- seconds. If it has not updated the last_ping after ``ping_death_interval``
- then the dispatcher is considered to be dead, and active jobs in the
- dispatcher's agents are ended (and given a chance to respond to that status
- change, so they can put themselves back on the queue to be restarted if
- desired).
+ The dispatcher needs to update its ``last_ping.value`` after every
+ ``ping_interval`` seconds. If it has not updated the ``last_ping.value``
+ after ``ping_death_interval`` then the dispatcher is considered to be dead,
+ and active jobs in the dispatcher's agents are ended (and given a chance to
+ respond to that status change, so they can put themselves back on the queue
+ to be restarted if desired).
>>> queue.dispatchers[dispatcher.UUID].ping_interval
datetime.timedelta(0, 30)
@@ -180,7 +180,7 @@
- We have some log entries. (We're using some magic log handlers inserted by
setup code in tests.py here.)
-
+
>>> print event_logs # doctest: +ELLIPSIS
zc.async.events INFO
attempting to activate dispatcher ...
@@ -282,8 +282,8 @@
{'main':
{'active jobs': [], 'error': None,
'new jobs': [(..., 'unnamed')], 'len': 0, 'size': 3}}}
-
+
[#getPollInfo]_ Notice our ``new jobs`` from the poll and the log has a value
in it now. We can get some information about that job from the dispatcher.
@@ -350,8 +350,18 @@
>>> badjob.parent # doctest: +ELLIPSIS
<zc.async.agent.Agent object at 0x...>
- >>> len(badjob.parent)
- 0
+ >>> import time
+ >>> for i in range(60):
+ ... if len(badjob.parent) == 0:
+ ... print 'yay'
+ ... break
+ ... else:
+ ... time.sleep(0.1)
+ ... _ = transaction.begin()
+ ... else:
+ ... print 'oops'
+ ...
+ yay
``zc.async.local`` also allows some fun tricks. Your callable can access the
queue, perhaps to put another job in.
@@ -371,7 +381,7 @@
of the last database sync for the thread's connection (at transaction
boundaries). This function is ``zc.async.local.getJob()``, and is seen below.
-It can also get and set job annotations *live, in another connection*.
+It can also get and set job annotations *live, in another connection*.
This allows you to send messages about job progress, or get live
information about whether you should change or stop your work, for
instance.
@@ -452,8 +462,8 @@
>>> import ZODB.FileStorage
>>> storage = ZODB.FileStorage.FileStorage(
... 'main.fs', create=True)
- >>> from ZODB.DB import DB
- >>> db = DB(storage)
+ >>> from ZODB.DB import DB
+ >>> db = DB(storage)
>>> conn = db.open()
>>> root = conn.root()
>>> import zc.async.configure
@@ -461,7 +471,7 @@
.. [#getPollInfo] The dispatcher has a ``getPollInfo`` method that lets you
find this poll information also.
-
+
>>> dispatcher.getPollInfo(at=poll.key) is poll
True
>>> dispatcher.getPollInfo(at=poll.utc_timestamp) is poll
@@ -480,7 +490,7 @@
.. [#show_error] OK, so you want to see a verbose traceback? OK, you asked
for it. We're eliding more than 90% of this, and this is a small one,
believe it or not. Rotate your logs!
-
+
Notice that all of the values in the logs are reprs.
>>> bad_job = queue.put(
@@ -489,8 +499,8 @@
>>> wait_for_result(bad_job)
<zc.twist.Failure exceptions.TypeError>
-
- >>> for r in reversed(trace_logs.records):
+
+ >>> for r in reversed(event_logs.records):
... if r.levelname == 'ERROR':
... break
... else:
@@ -500,12 +510,9 @@
<zc.async.job.Job (oid ..., db 'unnamed')
``<built-in function mul>(14, None)``> failed with traceback:
*--- Failure #... (pickled) ---
- .../zc/async/job.py:...: _call_with_retry(...)
+ .../zc/async/job.py:...: __call__(...)
[ Locals ]...
( Globals )...
- .../zc/async/job.py:...: <lambda>(...)
- [ Locals ]...
- ( Globals )...
exceptions.TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'
*--- End of Failure #... ---
<BLANKLINE>
Modified: zc.async/trunk/src/zc/async/interfaces.py
===================================================================
--- zc.async/trunk/src/zc/async/interfaces.py 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/interfaces.py 2008-06-20 01:18:18 UTC (rev 87584)
@@ -28,12 +28,12 @@
except ImportError:
class IDatabaseOpenedEvent(zope.interface.Interface):
"""The main database has been opened."""
-
+
database = zope.interface.Attribute("The main database.")
-
+
class DatabaseOpened(object):
zope.interface.implements(IDatabaseOpenedEvent)
-
+
def __init__(self, database):
self.database = database
@@ -51,31 +51,31 @@
class IReactor(zope.interface.Interface):
"""This describes what the dispatcher expects of the reactor.
-
+
The reactor does not need to actually provide this interface."""
-
+
def callFromThread(callable, *args, **kw):
"""have callable run in reactor's thread, by reactor, ASAP.
-
+
Intended to be called from a thread other than the reactor's main
loop.
"""
-
+
def callInThread(callable, *args, **kw):
"""have callable run in a separate thread, ASAP.
-
+
Must be called in same thread as reactor's main loop.
"""
-
+
def callLater(seconds, callable, *args, **kw):
"""have callable run in reactor at least <seconds> from now
-
+
Must be called in same thread as reactor's main loop.
"""
def addSystemEventTrigger(phase, event, callable, *args, **kw):
"""Install a callable to be run in phase of event.
-
+
must support phase 'before', and event 'shutdown'.
"""
@@ -83,10 +83,35 @@
"""run callable now if running, or when started.
"""
+class IRetryPolicy(zope.interface.Interface):
+ def jobError(failure, data_cache):
+ """whether and how to retry after an error while performing job.
+ return boolean as to whether to retry, or a datetime or timedelta to
+ reschedule the job in the queue. An empty timedelta means to rescedule
+ for immediately, before any pending calls in the queue."""
+
+ def commitError(failure, data_cache):
+ """whether to retry after trying to commit a job's successful result.
+
+ return boolean as to whether to retry, or a datetime or timedelta to
+ reschedule the job in the queue. An empty timedelta means to rescedule
+ for immediately, before any pending calls in the queue."""
+
+ def interrupted():
+ """whether to retry after a dispatcher dies when job was in progress.
+
+ return boolean as to whether to retry, or a datetime or timedelta to
+ reschedule the job in the queue. An empty timedelta means to rescedule
+ for immediately, before any pending calls in the queue."""
+
+ def updateData(data_cache):
+ """right before committing a job, retry is given a chance to stash
+ information it has saved in the data_cache."""
+
class IObjectEvent(zope.interface.Interface):
"""Event happened to object"""
-
+
object = zope.interface.Attribute('the object')
class AbstractObjectEvent(object):
@@ -136,9 +161,13 @@
class AbortedError(Exception):
"""An explicit abort, as generated by the default behavior of
- IJob.fail"""
+ IJob.handleInterrupt"""
+class TimeoutError(Exception):
+ """A time out caused by a ``begin_by`` value."""
+
+
class BadStatusError(Exception):
"""The job is not in the status it should be for the call being made.
This is almost certainly a programmer error."""
@@ -168,7 +197,7 @@
"""One of constants defined in zc.async.interfaces:
NEW, PENDING, ASSIGNED, ACTIVE, CALLBACKS, COMPLETED.
- NEW means not added to a queue and not yet called.
+ NEW means not added to a queue and not yet called.
PENDING means addded to a queue but not an agent, and not yet called.
ASSIGNED means added to an agent and not yet called.
ACTIVE means in the process of being called.
@@ -207,10 +236,8 @@
self.args for the call, and any kwargs effectively update self.kwargs
for the call."""
- def fail(e=AbortedError):
- """Fail this job, with option error e. May only be called when
- job is in PENDING or ACTIVE states, or else raises BadStatusError.
- If e is not provided,"""
+ def handleInterrupt():
+ """use IRetryPolicy to decide whether to abort."""
def resumeCallbacks():
"""Make all callbacks remaining for this job. Any callbacks
@@ -229,7 +256,7 @@
# """a set of selected worker UUIDs. If it is empty, it is
# interpreted as the set of all available workerUUIDs. Only
# workers with UUIDs in the set may perform it.
-#
+#
# If a worker would have selected this job for a run, but the
# difference of selected_workerUUIDs and excluded_workerUUIDs
# stopped it, it is responsible for verifying that the effective
@@ -250,26 +277,26 @@
class IAgent(zope.interface.common.sequence.IFiniteSequence):
"""Responsible for picking jobs and keeping track of them.
-
+
An agent is a persistent object in a queue that is associated with a
dispatcher and is responsible for picking jobs and keeping track of
them. Zero or more agents within a queue can be associated with a
- dispatcher.
-
+ dispatcher.
+
Each agent for a given dispatcher is identified uniquely with a
name. A fully (universally) unique identifier for the agent can be
obtained by combining the key of the agent's queue in the main queue
mapping at the ZODB root; the UUID of the agent's dispatcher; and
the agent's name.
"""
-
+
size = zope.interface.Attribute(
"""The maximum number of jobs this agent should have active at a time.
""")
name = zope.interface.Attribute(
"""The name for this agent. Unique within its dispatcher's jobs for
- its queue. Can be used to obtain agent with
+ its queue. Can be used to obtain agent with
queue.dispatchers[*dispatcher UUID*][*name*].""")
completed = zope.interface.Attribute(
@@ -305,9 +332,9 @@
Rememeber that IJobs are not guaranteed to be run in order
added to a queue. If you need sequencing, use
IJob.addCallbacks.
-
+
item must be an IJob, or be adaptable to that interface.
- begin_after must be None (to leave the job's current value) or a
+ begin_after must be None (to leave the job's current value) or a
datetime.datetime. begin_by must be None (to leave it alone) or a
datetime.timedelta of the duration after the begin_after.
@@ -326,11 +353,14 @@
queue on the `assignerUUID`.
"""
+ def putBack(item):
+ """Return a previously claimed job to the top of the queue."""
+
def pull(index=0):
"""Remove and return a job, by default from the front of the queue.
Raise IndexError if index does not exist.
-
+
This is the blessed way to remove an unclaimed job from the queue so
that dispatchers will not try to perform it.
"""
@@ -356,7 +386,7 @@
def ping(UUID):
"""responsible for setting ping time if necessary for this
dispatcher agent, and for decomissioning dead dispatchers for
- the next highest dispatcher (sorted by UUID) if its (last_ping +
+ the next highest dispatcher (sorted by UUID) if its (last_ping.value +
ping_interval + ping_death_interval) < now. If this is the
highest dispatcher UUID, cycles around to lowest."""
Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/job.py 2008-06-20 01:18:18 UTC (rev 87584)
@@ -11,11 +11,14 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
+import time
import types
import datetime
+import logging
import BTrees.OOBTree
import ZODB.POSException
+import ZEO.Exceptions
import transaction.interfaces
import persistent
import persistent.list
@@ -74,6 +77,131 @@
elif status == zc.async.interfaces.CALLBACKS:
a.resumeCallbacks()
+class RetryCommonFourTimes(persistent.Persistent): # default
+ zope.component.adapts(zc.async.interfaces.IJob)
+ zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+
+ # exceptions, data_cache key, max retry, initial backoff seconds,
+ # incremental backoff seconds, max backoff seconds
+ internal_exceptions = (
+ ((ZEO.Exceptions.ClientDisconnected,), 'zeo_disconnected',
+ None, 5, 5, 60),
+ ((ZODB.POSException.TransactionError,), 'transaction_error',
+ 5, 0, 0, 0),
+ )
+ transaction_exceptions = internal_exceptions
+ max_interruptions = 9
+ log_every = 5
+
+ def __init__(self, job):
+ self.parent = self.__parent__ = job
+ self.data = BTrees.family32.OO.BTree()
+
+ def updateData(self, data_cache):
+ if 'first_active' in self.data and 'first_active' in data_cache:
+ data_cache.pop('first_active')
+ self.data.update(data_cache)
+
+ def jobError(self, failure, data_cache):
+ return self._process(failure, data_cache, self.internal_exceptions)
+
+ def commitError(self, failure, data_cache):
+ return self._process(failure, data_cache, self.transaction_exceptions)
+
+ def _process(self, failure, data_cache, exceptions):
+ for (exc, key, max_count, init_backoff,
+ incr_backoff, max_backoff) in exceptions:
+ if failure.check(*exc) is not None:
+ count = data_cache.get(key, 0) + 1
+ if max_count is not None and count >= max_count:
+ zc.async.utils.tracelog.info(
+ 'Retry policy for job %r is not retrying after %d '
+ 'counts of %s occurrences', self.parent, count, key)
+ return False
+ elif count==1 or not count % self.log_every:
+ zc.async.utils.tracelog.info(
+ 'Retry policy for job %r requests another attempt '
+ 'after %d counts of %s occurrences', self.parent,
+ count, key)
+ backoff = min(max_backoff,
+ (init_backoff + (count-1) * incr_backoff))
+ if backoff:
+ time.sleep(backoff)
+ data_cache[key] = count
+ data_cache['last_' + key] = failure
+ if 'first_active' not in data_cache:
+ data_cache['first_active'] = self.parent.active_start
+ return True
+ return False
+
+ def interrupted(self):
+ if 'first_active' not in self.data:
+ self.data['first_active'] = self.parent.active_start
+ count = self.data['interruptions'] = self.data.get('interruptions', 0) + 1
+ if self.max_interruptions is None or count <= self.max_interruptions:
+ if count==1 or not count % self.log_every:
+ zc.async.utils.tracelog.info(
+ 'Retry policy for job %r requests another attempt '
+ 'after %d interrupts', self.parent, count)
+ return True
+ else:
+ zc.async.utils.tracelog.info(
+ 'Retry policy for job %r is not retrying after %d '
+ 'interrupts', self.parent, count)
+ return False
+
+
+class RetryCommonForever(RetryCommonFourTimes):
+ # retry on ZEO failures and Transaction errors during the job forever
+ # retry on commitErrors and interrupteds forever.
+ internal_exceptions = (
+ ((ZEO.Exceptions.ClientDisconnected,), 'zeo_disconnected',
+ None, 5, 5, 60),
+ ((ZODB.POSException.TransactionError,), 'transaction_error',
+ None, 0, 0, 0),
+ )
+
+ max_interruptions = None
+
+ def commitError(self, failure, data_cache):
+ res = super(RetryCommonForever, self).commitError(failure, data_cache)
+ if not res:
+ # that just means we didn't record it. We actually are going to
+ # retry.
+ key = 'other'
+ data_cache['other'] = data_cache.get('other', 0) + 1
+ data_cache['last_other'] = failure
+ if 'first_active' not in data_cache:
+ data_cache['first_active'] = self.parent.active_start
+ return True # always retry
+
+class NeverRetry(persistent.Persistent):
+ zope.component.adapts(zc.async.interfaces.IJob)
+ zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+
+ def __init__(self, job):
+ self.parent = self.__parent__ = job
+
+ def updateData(self, data_cache):
+ pass
+
+ def jobError(self, failure, data_cache):
+ return False
+
+ def commitError(self, failure, data_cache):
+ return False
+
+ def interrupted(self):
+ return False
+
+def callback_retry_policy_factory(job):
+ res = zope.component.queryAdapter(
+ job, zc.async.interfaces.IRetryPolicy, 'callback')
+ if res is None:
+ res = RetryCommonForever(job)
+ return res
+
+
class Job(zc.async.utils.Base):
zope.interface.implements(zc.async.interfaces.IJob)
@@ -82,7 +210,11 @@
_status = zc.async.interfaces.NEW
_begin_after = _begin_by = _active_start = _active_end = None
key = None
-
+ _retry_policy = None
+ retry_policy_factory = None # effectively "look up IRetryPolicy adapter
+ # for '' (empty string) name, and use RetryCommonFourTimes if the adapter
+ # doesn't exist"
+ failure_log_level = None # effectively logging.ERROR
assignerUUID = None
_quota_names = ()
@@ -227,7 +359,8 @@
@rwproperty.setproperty
def callable(self, value):
# can't pickle/persist methods by default as of this writing, so we
- # add the sugar ourselves
+ # add the sugar ourselves. In future, would like for args to be
+ # potentially methods of persistent objects too...
if self._status != zc.async.interfaces.NEW:
raise zc.async.interfaces.BadStatusError(
'can only set callable when a job has NEW, PENDING, or '
@@ -243,12 +376,31 @@
if zc.async.interfaces.IJob.providedBy(self._callable_root):
self._callable_root.parent = self
- def addCallbacks(self, success=None, failure=None):
+ def addCallbacks(self, success=None, failure=None,
+ failure_log_level=None, retry_policy_factory=None):
if success is not None or failure is not None:
if success is not None:
success = zc.async.interfaces.IJob(success)
+ if failure_log_level is not None:
+ success.failure_log_level = failure_log_level
+ elif success.failure_log_level is None:
+ success.failure_log_level = logging.CRITICAL
+ if retry_policy_factory is not None:
+ success.retry_policy_factory = retry_policy_factory
+ elif success.retry_policy_factory is None:
+ success.retry_policy_factory = (
+ callback_retry_policy_factory)
if failure is not None:
failure = zc.async.interfaces.IJob(failure)
+ if failure_log_level is not None:
+ failure.failure_log_level = failure_log_level
+ elif failure.failure_log_level is None:
+ failure.failure_log_level = logging.CRITICAL
+ if retry_policy_factory is not None:
+ failure.retry_policy_factory = retry_policy_factory
+ elif failure.retry_policy_factory is None:
+ failure.retry_policy_factory = (
+ callback_retry_policy_factory)
res = Job(success_or_failure, success, failure)
if success is not None:
success.parent = res
@@ -260,12 +412,14 @@
abort_handler = zc.async.interfaces.IJob(
completeStartedJobArguments)
abort_handler.args.append(res)
- res.addCallback(abort_handler)
+ res.addCallback(
+ abort_handler, failure_log_level, retry_policy_factory)
else:
res = self
return res
- def addCallback(self, callback):
+ def addCallback(self, callback, failure_log_level=None,
+ retry_policy_factory=None):
callback = zc.async.interfaces.IJob(callback)
self.callbacks.put(callback)
callback.parent = self
@@ -274,96 +428,289 @@
else:
self._p_changed = True # to try and fire conflict errors if
# our reading of self.status has changed beneath us
+ if failure_log_level is not None:
+ callback.failure_log_level = failure_log_level
+ elif callback.failure_log_level is None:
+ callback.failure_log_level = logging.CRITICAL
+ if retry_policy_factory is not None:
+ callback.retry_policy_factory = retry_policy_factory
+ elif callback.retry_policy_factory is None:
+ callback.retry_policy_factory = callback_retry_policy_factory
return callback
+ def getRetryPolicy(self):
+ if self._retry_policy is not None:
+ return self._retry_policy
+ if self.retry_policy_factory is None:
+ # first try to look up adapter with name of ''; then if that fails
+ # use RetryCommonFourTimes
+ res = zope.component.queryAdapter(
+ self, zc.async.interfaces.IRetryPolicy, '')
+ if res is None:
+ res = RetryCommonFourTimes(self)
+ elif isinstance(self.retry_policy_factory, basestring):
+ res = zope.component.getAdapter(
+ self, zc.async.interfaces.IRetryPolicy,
+ self.retry_policy_factory)
+ # this may cause an error. We can't proceed because we don't know
+ # what to do, and it may be *critical* to know. Therefore, in
+ # _getRetry, we rely on never_fail to keep on sending critical
+ # errors in the log, and never stopping.
+ else:
+ res = self.retry_policy_factory(self)
+ self._retry_policy = res
+ return res
+
+ def _getRetry(self, call_name, tm, *args):
+ # if we are after the time that we are supposed to begin_by, no retry
+ if (self.begin_by is not None and self.begin_after is not None and
+ self.begin_by + self.begin_after > datetime.datetime.now(pytz.UTC)):
+ return False
+ # we divide up the two ``never_fail`` calls so that retries in getting
+ # the policy don't affect actually calling the method.
+ identifier = 'getting retry policy for %r' % (self,)
+ policy = zc.async.utils.never_fail(self.getRetryPolicy, identifier, tm)
+ call = getattr(policy, call_name, None)
+ if call is None:
+ zc.async.utils.log.error(
+ 'retry policy %r for %r does not have required %s method',
+ policy, self, call_name)
+ return None
+ identifier = 'getting result for %s retry for %r' % (call_name, self)
+ return zc.async.utils.never_fail(lambda: call(*args), identifier, tm)
+
def __call__(self, *args, **kwargs):
if self.status not in (zc.async.interfaces.NEW,
zc.async.interfaces.ASSIGNED):
raise zc.async.interfaces.BadStatusError(
'can only call a job with NEW or ASSIGNED status')
tm = transaction.interfaces.ITransactionManager(self)
- self._status = zc.async.interfaces.ACTIVE
- self._active_start = datetime.datetime.now(pytz.UTC)
- tm.commit()
- effective_args = list(args)
- effective_args[0:0] = self.args
- effective_kwargs = dict(self.kwargs)
- effective_kwargs.update(kwargs)
- return self._call_with_retry(
- lambda: self.callable(*effective_args, **effective_kwargs))
-
- def _call_with_retry(self, call):
- ct = 0
- tm = transaction.interfaces.ITransactionManager(self)
+ def prepare():
+ self._status = zc.async.interfaces.ACTIVE
+ self._active_start = datetime.datetime.now(pytz.UTC)
+ effective_args = list(args)
+ effective_args[0:0] = self.args
+ effective_kwargs = dict(self.kwargs)
+ effective_kwargs.update(kwargs)
+ return effective_args, effective_kwargs
+ identifier = 'preparing for call of %r' % (self,)
+ effective_args, effective_kwargs = zc.async.utils.never_fail(
+ prepare, identifier, tm)
+ # this is the calling code. It is complex and long because it is
+ # trying both to handle exceptions reasonably, and to honor the
+ # IRetryPolicy interface for those exceptions.
+ data_cache = {}
res = None
while 1:
try:
- res = call()
- if zc.async.interfaces.IJob.providedBy(res):
- res.addCallback(self._callback)
- tm.commit()
- elif isinstance(res, twisted.internet.defer.Deferred):
- res.addBoth(zc.twist.Partial(self._callback))
- tm.commit()
- else:
- res = self._complete(res, tm)
- except ZODB.POSException.TransactionError:
+ res = self.callable(*effective_args, **effective_kwargs)
+ except zc.async.utils.EXPLOSIVE_ERRORS:
tm.abort()
- ct += 1
- if ct >= 5:
- res = self._complete(zc.twist.Failure(), tm)
- self.resumeCallbacks()
- else:
+ raise
+ except:
+ res = zc.twist.Failure()
+ tm.abort()
+ retry = self._getRetry('jobError', tm, res, data_cache)
+ if isinstance(retry, (datetime.timedelta, datetime.datetime)):
+ identifier = (
+ 'rescheduling %r as requested by '
+ 'associated IRetryPolicy %r' % (
+ self, self.getRetryPolicy()))
+ if self is zc.async.utils.never_fail(
+ lambda: self._reschedule(retry, data_cache),
+ identifier, tm):
+ return self
+ elif retry:
continue
- except zc.twist.EXPLOSIVE_ERRORS:
+ # policy didn't exist or returned False or couldn't reschedule
+ try:
+ callback = self._set_result(res, tm, data_cache)
+ except zc.async.utils.EXPLOSIVE_ERRORS:
tm.abort()
raise
except:
+ failure = zc.twist.Failure()
tm.abort()
- res = self._complete(zc.twist.Failure(), tm)
- self.resumeCallbacks()
+ retry = self._getRetry('commitError', tm, failure, data_cache)
+ if isinstance(retry, (datetime.timedelta, datetime.datetime)):
+ identifier = (
+ 'rescheduling %r as requested by '
+ 'associated IRetryPolicy %r' % (
+ self, self.getRetryPolicy()))
+ if self is zc.async.utils.never_fail(
+ lambda: self._reschedule(retry, data_cache),
+ identifier, tm):
+ return self
+ elif retry:
+ continue
+ # policy didn't exist or returned False or couldn't reschedule
+ if isinstance(res, twisted.python.failure.Failure):
+ log_level = self.failure_log_level
+ if log_level is None:
+ log_level = logging.ERROR
+ zc.async.utils.log.log(
+ log_level,
+ 'Commit failed for %r (see subsequent traceback). '
+ 'Prior to this, job failed with traceback:\n%s',
+ self,
+ res.getTraceback(
+ elideFrameworkCode=True, detail='verbose'))
+ else:
+ zc.async.utils.log.info(
+ 'Commit failed for %r (see subsequent traceback). '
+ 'Prior to this, job succeeded with result: %r',
+ self, res)
+ res = failure
+ def complete():
+ self._result = res
+ self._status = zc.async.interfaces.CALLBACKS
+ self._active_end = datetime.datetime.now(pytz.UTC)
+ policy = self.getRetryPolicy()
+ if data_cache and self._retry_policy is not None:
+ self._retry_policy.updateData(data_cache)
+ identifier = 'storing failure at commit of %r' % (self,)
+ zc.async.utils.never_fail(complete, identifier, tm)
+ callback = True
+ if callback:
+ self._log_completion(res)
+ identifier = 'performing callbacks of %r' % (self,)
+ zc.async.utils.never_fail(self.resumeCallbacks, identifier, tm)
+ return res
+
+ def handleInterrupt(self):
+ # should be called within a job that has a RetryCommonForever policy
+ tm = transaction.interfaces.ITransactionManager(self)
+ if self.status == zc.async.interfaces.ACTIVE:
+ retry = self._getRetry('interrupted', tm)
+ if isinstance(retry, (datetime.datetime, datetime.timedelta)):
+ self._reschedule(retry, queue=self.queue)
+ elif retry:
+ self._reschedule(datetime.timedelta(), queue=self.queue)
else:
- if self._status == zc.async.interfaces.CALLBACKS:
+ res = zc.twist.Failure(zc.async.interfaces.AbortedError())
+ if self._set_result(res, tm):
self.resumeCallbacks()
- return res
+ self._log_completion(res)
+ elif self.status != zc.async.interfaces.CALLBACKS:
+ # we have to allow CALLBACKS or else some retries will fall over,
+ # because handleInterrupt may fail after a commit of the aborted
+ # error
+ raise zc.async.interfaces.BadStatusError(
+ 'can only call ``handleInterrupt`` on a job with ACTIVE '
+ 'status') # um...or CALLBACKS, but that's a secret :-D
+ else:
+ self.resumeCallbacks()
- def _callback(self, res):
- self._call_with_retry(lambda: res)
+ def fail(self, e=None):
+ # something may have fallen over the last time this was called, so we
+ # are careful to only store the error if we're not in the CALLBACKS
+ # status.
+ callback = True
+ status = self.status
+ if status in (zc.async.interfaces.COMPLETED,
+ zc.async.interfaces.ACTIVE):
+ raise zc.async.interfaces.BadStatusError(
+ 'can only call fail on a job with NEW, PENDING, or ASSIGNED '
+ 'status') # ...or CALLBACKS, but that's because of
+ # retries, and is semantically incorrect
+ if status != zc.async.interfaces.CALLBACKS:
+ if e is None:
+ e = zc.async.interfaces.TimeoutError()
+ res = zc.twist.Failure(e)
+ callback = self._set_result(
+ res, transaction.interfaces.ITransactionManager(self))
+ self._log_completion(res)
+ if callback:
+ self.resumeCallbacks()
- def _complete(self, res, tm):
- if isinstance(res, twisted.python.failure.Failure):
- res = zc.twist.sanitize(res)
- failure = True
+ def _reschedule(self, when, data_cache=None, queue=None):
+ if not isinstance(when, (datetime.datetime, datetime.timedelta)):
+ raise TypeError('``when`` must be datetime or timedelta')
+ in_agent = zc.async.interfaces.IAgent.providedBy(self.parent)
+ if queue is None:
+ # this is a reschedule from jobError or commitError
+ if not in_agent:
+ zc.async.utils.log.critical(
+ 'error for IRetryPolicy %r on %r: '
+ 'can only reschedule a job directly in an agent',
+ self.getRetryPolicy(), self)
+ return None
+ queue = self.queue
+ if data_cache is not None and self._retry_policy is not None:
+ self._retry_policy.updateData(data_cache)
+ self._status = zc.async.interfaces.NEW
+ self._active_start = None
+ if in_agent:
+ self.parent.remove(self)
else:
- failure = False
- self._result = res
- self._status = zc.async.interfaces.CALLBACKS
- self._active_end = datetime.datetime.now(pytz.UTC)
+ self.parent = None
+ now = datetime.datetime.now(pytz.UTC)
+ if isinstance(when, datetime.datetime):
+ if when.tzinfo is None:
+ when = when.replace(tzinfo=pytz.UTC)
+ if when <= now:
+ queue.putBack(self)
+ else:
+ queue.put(self, begin_after=when)
+ elif isinstance(when, datetime.timedelta):
+ if when <= datetime.timedelta():
+ queue.putBack(self)
+ else:
+ queue.put(self, begin_after=now+when)
+ return self
+
+ def _set_result(self, res, tm, data_cache=None):
+ # returns whether to call ``resumeCallbacks``
+ callback = True
+ if zc.async.interfaces.IJob.providedBy(res):
+ res.addCallback(self._callback)
+ callback = False
+ elif isinstance(res, twisted.internet.defer.Deferred):
+ partial = zc.twist.Partial(self._callback)
+ partial.max_transaction_errors = None # retry conflicts forever
+ res.addBoth(partial)
+ callback = False
+ else:
+ if isinstance(res, twisted.python.failure.Failure):
+ res = zc.twist.sanitize(res)
+ self._result = res
+ self._status = zc.async.interfaces.CALLBACKS
+ self._active_end = datetime.datetime.now(pytz.UTC)
+ if self._retry_policy is not None and data_cache:
+ self._retry_policy.updateData(data_cache)
tm.commit()
- if failure:
- zc.async.utils.tracelog.error(
+ return callback
+
+ def _log_completion(self, res):
+ if isinstance(res, twisted.python.failure.Failure):
+ log_level = self.failure_log_level
+ if log_level is None:
+ log_level = logging.ERROR
+ zc.async.utils.log.log(
+ log_level,
'%r failed with traceback:\n%s',
self,
- res.getTraceback(elideFrameworkCode=True, detail='verbose'))
+ res.getTraceback(
+ elideFrameworkCode=True, detail='verbose'))
else:
zc.async.utils.tracelog.info(
- '%r succeeded with result:\n%r',
+ '%r succeeded with result: %r',
self, res)
- return res
- def fail(self, e=None):
- if e is None:
- e = zc.async.interfaces.AbortedError()
- if self._status not in (zc.async.interfaces.NEW,
- zc.async.interfaces.ACTIVE):
- raise zc.async.interfaces.BadStatusError(
- 'can only call fail on a job with NEW, PENDING, ASSIGNED, or '
- 'ACTIVE status')
- self._complete(zc.twist.Failure(e),
- transaction.interfaces.ITransactionManager(self))
- self.resumeCallbacks()
+ def _callback(self, res):
+ # done within a job or partial, so we can rely on their retry bits to
+ # some degree. However, we commit transactions ourselves, so we have
+ # to be a bit careful that the result hasn't been set already.
+ callback = True
+ if self._status == zc.async.interfaces.ACTIVE:
+ callback = self._set_result(
+ res, transaction.interfaces.ITransactionManager(self))
+ self._log_completion(res)
+ if callback:
+ self.resumeCallbacks()
def resumeCallbacks(self):
+ # should be called within a job that has a RetryCommonForever policy
if self._status != zc.async.interfaces.CALLBACKS:
raise zc.async.interfaces.BadStatusError(
'can only resumeCallbacks on a job with CALLBACKS status')
@@ -372,34 +719,53 @@
length = 0
while 1:
for j in callbacks:
+ # TODO yuck: this mucks in callbacks' protected bits
if j._status == zc.async.interfaces.NEW:
- zc.async.utils.tracelog.debug(
- 'starting callback %r to %r', j, self)
- j(self.result)
+ if (j.begin_by is not None and
+ (j.begin_after + j.begin_by) <
+ datetime.datetime.now(pytz.UTC)):
+ zc.async.utils.log.error(
+ 'failing expired callback %r to %r', j, self)
+ j.fail()
+ else:
+ zc.async.utils.tracelog.debug(
+ 'starting callback %r to %r', j, self)
+ j(self.result)
elif j._status == zc.async.interfaces.ACTIVE:
- zc.async.utils.tracelog.debug(
- 'failing aborted callback %r to %r', j, self)
- j.fail()
+ retry = j._getRetry('interrupted', tm)
+ istime = isinstance(
+ retry, (datetime.timedelta, datetime.datetime))
+ if istime:
+ zc.async.utils.log.error(
+ 'error for IRetryPolicy %r on %r: '
+ 'cannot reschedule a callback, only retry',
+ j.getRetryPolicy(), j)
+ if retry or istime:
+ zc.async.utils.tracelog.debug(
+ 'retrying interrupted callback '
+ '%r to %r', j, self)
+ j._status = zc.async.interfaces.NEW
+ j._active_start = None
+ j(self.result)
+ else:
+ zc.async.utils.tracelog.debug(
+ 'aborting interrupted callback '
+ '%r to %r', j, self)
+ j.fail(zc.async.interfaces.AbortedError())
elif j._status == zc.async.interfaces.CALLBACKS:
j.resumeCallbacks()
# TODO: this shouldn't raise anything we want to catch, right?
# now, this should catch all the errors except EXPLOSIVE_ERRORS
# cleaning up dead jobs should look something like the above.
- tm.commit()
tm.begin() # syncs
# it's possible that someone added some callbacks, so run until
# we're exhausted.
length += len(callbacks)
callbacks = list(self.callbacks)[length:]
if not callbacks:
- try:
- self._status = zc.async.interfaces.COMPLETED
- if zc.async.interfaces.IAgent.providedBy(self.parent):
- self.parent.jobCompleted(self)
- tm.commit()
- except ZODB.POSException.TransactionError:
- tm.abort()
- callbacks = list(self.callbacks)[length:]
- else:
- break # and return
-
+ # this whole method is called within a never_fail...
+ self._status = zc.async.interfaces.COMPLETED
+ if zc.async.interfaces.IAgent.providedBy(self.parent):
+ self.parent.jobCompleted(self)
+ tm.commit()
+ return
Modified: zc.async/trunk/src/zc/async/job.txt
===================================================================
--- zc.async/trunk/src/zc/async/job.txt 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/job.txt 2008-06-20 01:18:18 UTC (rev 87584)
@@ -2,34 +2,34 @@
Jobs
====
-What if you want to persist a reference to the method of a persistent
-object--you can't persist that normally in the ZODB, but that can be
-very useful, especially to store asynchronous calls. What if you want
-to act on the result of an asynchronous call that may be called later?
-The zc.async package offers an approach that combines ideas of a partial
-and that of Twisted deferred code: ``zc.async.job.Job``.
+What if you want to persist a reference to the method of a persistent object?
+You can't persist that normally in the ZODB, but that can be very useful,
+especially to store asynchronous calls. What if you want to act on the result
+of an asynchronous call that may be called later?
-To use it, simply wrap the callable--a method of a persistent object or
-a callable persistent object or a global function--in the job. You can
-include ordered and keyword arguments to the job, which may be
-persistent objects or simply pickleable objects.
+The zc.async package offers an approach that combines ideas of a partial and
+that of a Twisted deferred: ``zc.async.job.Job``. It has code that is specific
+to zc.async, so it is not truly a general-purpose persistent partial, but this
+file shows the Job largely stand-alone.
-Unlike a partial but like a Twisted deferred, the result of the wrapped
-call goes on the job's ``result`` attribute, and the immediate return of
-the call might not be the job's end result. It could also be a failure,
-indicating an exception; or another partial, indicating that we are
-waiting to be called back by the second partial; or a twisted deferred,
-indicating that we are waiting to be called back by a twisted Deferred
-(see the ``zc.twist``). After you have the partial, you can then use a
-number of methods and attributes on the partial for further set up.
-Let's show the most basic use first, though.
+To use a job, simply wrap the callable--a method of a persistent object or a
+callable persistent object or a global function--in the job. You can include
+ordered and keyword arguments to the job, which may be persistent objects or
+simply pickleable objects.
-Note that, even though this looks like an interactive prompt, all
-functions and classes defined in this document act as if they were
-defined within a module. Classes and functions defined in an interactive
-prompt are normally not picklable, and Jobs must work with
-picklable objects [#set_up]_.
+Unlike a partial but like a Twisted deferred, the result of the wrapped call
+goes on the job's ``result`` attribute. It could also be a failure, indicating
+an exception; or temporarily, None, indicating that we are waiting to be called
+back by a second Job or a twisted Deferred (see the ``zc.twist`` package).
+After you have the job, you can then use a number of methods and attributes
+on the partial for further set up. Let's show the most basic use first, though.
+
+(Note that, even though this looks like an interactive prompt, all functions and
+classes defined in this document act as if they were defined within a module.
+Classes and functions defined in an interactive prompt are normally not
+picklable, and Jobs must work with picklable objects. [#set_up]_.)
+
>>> import zc.async.job
>>> def call():
... print 'hello world'
@@ -52,8 +52,8 @@
>>> j.status == zc.async.interfaces.NEW
True
-We can call the job from the NEW (or ASSIGNED, see later) status, and
-then see that the function was called, and see the result on the partial.
+We can call the job from the NEW (or PENDING or ASSIGNED, see later) status,
+and then see that the function was called, and see the result on the partial.
>>> res = j()
hello world
@@ -69,8 +69,8 @@
'my result'
In addition to using a global function, we can also use a method of a
-persistent object. Imagine we have a ZODB root that we can put objects
-in to.
+persistent object. (For this example, imagine we have a ZODB root into which we
+can put objects.)
>>> import persistent
>>> class Demo(persistent.Persistent):
@@ -117,13 +117,13 @@
...
exceptions.RuntimeError: Bad Things Happened Here
-Note that all calls can return a failure explicitly, rather than raising
-an exception that the job converts to an exception. However, there
-is an important difference in behavior. If a wrapped call raises an
-exception, the job aborts the transaction; but if the wrapped call
-returns a failure, no abort occurs. Wrapped calls that explicitly return
-failures are thus responsible for any necessary transaction aborts. See
-the footnote for an example [#explicit_failure_example]_.
+Note that all calls can return a failure explicitly, rather than raising an
+exception that the job converts to an exception. However, there is an important
+difference in behavior. If a wrapped call raises an exception, the job aborts
+the transaction; but if the wrapped call returns a failure, no automatic abort
+occurs. Wrapped calls that explicitly return failures are thus responsible for
+any necessary transaction aborts. See the footnote for an example
+[#explicit_failure_example]_.
Now let's return a job from the job. This generally represents a result
that is waiting on another asynchronous persistent call, which would
@@ -236,23 +236,24 @@
topic.
Callbacks
----------
+=========
-The job object can also be used to handle return values and
-exceptions from the call. The ``addCallbacks`` method enables the
-functionality. Its signature is (success=None, failure=None). It may
-be called multiple times, each time adding a success and/or failure
-callable that takes an end result: a value or a zc.async.Failure object,
-respectively. Failure objects are passed to failure callables, and
-any other results are passed to success callables.
+The job object can also be used to handle return values and exceptions from the
+call. The ``addCallbacks`` method enables the functionality. Its signature is
+(success=None, failure=None). It may be called multiple times, each time adding
+a success and/or failure callable that takes the end result of the original,
+parent job: a value or a zc.async.Failure object, respectively. Failure objects
+are passed to failure callables, and any other results are passed to success
+callables.
-The return value of the success and failure callables is
-important for chains and for determining whether a job had any
-errors that need logging, as we'll see below. The call to
-``addCallbacks`` returns a job, which can be used for chaining (see
-``Chaining Callbacks``_).
+Note that, unlike with Twisted deferred's, the results of callbacks for a given
+job are not chained. To chain, add a callback to the desired callback. The
+primary reason for a Twisted callback is try:except:else:finally logic. In
+most cases, because zc.async jobs are long-running, the try:except logic can be
+accomplished within the code for the job itself. For certain kinds of
+exceptions, an IRetryPolicy is used instead.
-Let's look at a simple example.
+Let's look at a simple example of a callback.
>>> def call(*args):
... res = 1
@@ -333,6 +334,7 @@
failure. [Failure instance: Traceback: exceptions.TypeError...]
also a failure. [Failure instance: Traceback: exceptions.TypeError...]
+------------------
Chaining Callbacks
------------------
@@ -384,6 +386,7 @@
>>> isinstance(j.result, twisted.python.failure.Failure)
True
+--------------------------
Callbacks on Completed Job
--------------------------
@@ -403,6 +406,7 @@
>>> j.status == zc.async.interfaces.COMPLETED
True
+-------------
Chaining Jobs
-------------
@@ -422,33 +426,68 @@
success! 15
also a success! 60
-Failing
--------
+Failures
+========
-Speaking again of failures, it's worth discussing two other aspects of
-failing. One is that jobs offer an explicit way to fail a call. It
-can be called when the job has a NEW, PENDING, ASSIGNED or ACTIVE status.
-The primary use cases for this method are to cancel a job that is
-overdue to start, and to cancel a job that was in progress by a
-worker thread in a dispatcher when the dispatcher died (more on that below).
+Let's talk more about failures. We'll divide them up into a two large types.
+- The code you ran in the job failed.
+
+- The system around your code (zc.async code, the ZODB, or even the machine on
+ which the job is running) failed.
+
+If your code fails, it's mostly up to you to deal with it. There are two
+mechanisms to use: callbacks, which we've already seen, and retry policies,
+which we'll look at below.
+
+If the system fails around your code, four sorts of things might happen. In the
+context of the full zc.async system, it is the responsibility of the zc.async
+code to react as described below, and, when pertinent, your reponsibility to
+make sure that the retry policy for the job does what you need.
+
+- The job never started, and timed out. zc.async should call ``fail`` on the
+ job, which aborts the job and begins callbacks. (This is handled in the
+ queue's ``claim`` method, in the full context of zc.async.)
+
+- The job started but was unable to commit. Internally to the job's machinery,
+ the job uses a retry policy to decide what to do, as described below.
+
+- The job started but then was interrupted. zc.async should call
+ ``handleInterrupt``, in which a retry policy decides what to do, as described
+ below. (This is handled in a DispatcherAgents collection's ``deactivate``
+ method, in the full context of zc.async.)
+
+- The job completed, but the callbacks were interrupted. zc.async should call
+ ``resumeCallbacks``, which will handle the remaining callbacks in the manner
+ described here. (This is handled in a DispatcherAgents collection's
+ ``deactivate`` method, in the full context of zc.async.)
+
+We need to explore a few elements of this, then: the ``fail`` method, retry
+policies, job errors, commit errors, and the ``handleInterrupt`` method.
+
+--------
+``fail``
+--------
+
+The ``fail`` method is an explicit way to fail a job. It can be called when the
+job has a NEW, PENDING, or ASSIGNED status. The use case for this method is to
+cancel a job that is overdue to start. It defaults to a TimeoutError.
+
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
>>> transaction.commit()
>>> j.fail()
>>> print j.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- Traceback (most recent call last):
- ...
- zc.async.interfaces.AbortedError:
+ Traceback...zc.async.interfaces.TimeoutError...
-``fail`` calls all failure callbacks with the failure.
+``fail`` calls all callbacks with the failure.
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
>>> j_callback = j.addCallbacks(failure=failure)
>>> transaction.commit()
>>> res = j.fail() # doctest: +ELLIPSIS
- failure. [Failure instance: Traceback...zc.async.interfaces.AbortedError...]
+ failure. [Failure instance: Traceback...zc.async.interfaces.TimeoutError...]
-As seen above, it fails with zc.async.interfaces.AbortedError by default.
+As seen above, it fails with zc.async.interfaces.TimeoutError by default.
You can also pass in a different error.
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
@@ -459,43 +498,856 @@
...
exceptions.RuntimeError: failed
-As mentioned, if a dispatcher dies when working on an active task, the
-active task should be aborted using ``fail``, so the method also works if
-a job has the ACTIVE status. We'll reach under the covers to show this.
+It won't work for failing tasks in ACTIVE or COMPLETED status.
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
>>> j._status = zc.async.interfaces.ACTIVE
>>> transaction.commit()
>>> j.fail()
- >>> print j.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
- zc.async.interfaces.AbortedError:
+ BadStatusError: can only call fail on a job with NEW, PENDING, or ASSIGNED status
-It won't work for failing tasks in COMPLETED or CALLBACKS status.
-
+ >>> j._status = zc.async.interfaces.NEW
>>> j.fail()
+ >>> j.status == zc.async.interfaces.COMPLETED
+ True
+ >>> j.fail()
Traceback (most recent call last):
...
- BadStatusError: can only call fail on a job with NEW, PENDING, ASSIGNED, or ACTIVE status
+ BadStatusError: can only call fail on a job with NEW, PENDING, or ASSIGNED status
+
+It's a dirty secret that it will work in CALLBACKS status. This is because we
+need to support retries in the face of internal commits.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
>>> j._status = zc.async.interfaces.CALLBACKS
+ >>> transaction.commit()
>>> j.fail()
- Traceback (most recent call last):
+ >>> j.status == zc.async.interfaces.COMPLETED
+ True
+ >>> j.result is None # the default
+ True
+
+--------------
+Retry Policies
+--------------
+
+All of the other failure situations touch on retry policies, directly or
+indirectly.
+
+What is a retry policy? It is used in three circumstances.
+
+- When the code the job ran fails, the job will try to call
+ ``retry_policy.jobError(failure, data_cache)`` to get a boolean as to whether
+ the job should be retried.
+
+- When the commit fails, the job will try to call
+ ``retry_policy.commitError(failure, data_cache)`` to get a boolean as to
+ whether the job should be retried.
+
+- When the job starts but fails to complete because the system is interrupted,
+ the job will try to call ``retry_policy.interrupted()`` to get a boolean as
+ to whether the job should be retried.
+
+Why does this need to be a policy? Can't it be a simpler arrangement?
+
+The heart of the problem is that different jobs need different error
+resolutions.
+
+In some cases, jobs may not be fully transactional. For instance, the job
+may be communicating with an external system, such as a credit card system.
+The retry policy here should typically be "never": perhaps a callback should be
+in charge of determining what to do next.
+
+If a job is fully transactional, it can be retried. But even then the desired
+behavior may differ.
+
+- In typical cases, some errors should simply cause a failure, while other
+ errors, such as database conflict errors, should cause a limited number of
+ retries.
+
+- In some jobs, conflict errors should be retried forever, because the job must
+ be run to completion or else the system should fall over. Callbacks that try
+ to handle errors themselves may take this approach, for instance.
+
+zc.async currently ships with three retry policies.
+
+1. The default, appropriate for most fully transactional jobs, is the
+ zc.async.job.RetryCommonFourTimes.
+
+2. The other available (pre-written) option for transactional jobs is
+ zc.async.job.RetryCommonForever. Callbacks will get this policy by
+ default.
+
+Both of these policies retry ZEO disconnects forever; and interrupts and
+transaction errors such as conflicts either four times (for a total of five
+attempts) or forever, respectively.
+
+3. The last retry policy is zc.async.job.NeverRetry. This is appropriate for
+ non-transactional jobs. You'll still typically need to handle errors in
+ your callbacks.
+
+Let's take a detailed look at these three in isolation before seeing them in
+practice below.
+
+We'll start with the default retry policy, RetryCommonFourTimes. We can get it
+with ``getRetryPolicy``.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+ True
+ >>> verifyObject(zc.async.interfaces.IRetryPolicy, policy)
+ True
+
+Now we'll try out a few calls. ``jobError`` and ``commitError`` both take a
+failure and a dictionary to stash data about the retry. The job then calls
+the retry policy with the data dictionary before a commit using the
+``updateData`` method.
+
+Here's the policy requesting that a job be tried a total of five times,
+combined from in the job and in the commit
+
+ >>> import zc.twist
+ >>> import ZODB.POSException
+ >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+ >>> data = {}
+
+ >>> policy.jobError(conflict, data)
+ True
+ >>> policy.jobError(conflict, data)
+ True
+ >>> policy.jobError(conflict, data)
+ True
+ >>> policy.jobError(conflict, data)
+ True
+ >>> policy.jobError(conflict, data)
+ False
+
+Now we've expired the total number of retries for this kind of error, so during
+commit it will fail immediately.
+
+ >>> policy.commitError(conflict, data)
+ False
+
+We'll reset the data (which holds the information until ``updateData`` is
+called to store the information persistently, so that aborted transactions
+don't discard the history from past attempts) and try again.
+
+ >>> data = {}
+ >>> policy.commitError(conflict, data)
+ True
+ >>> policy.commitError(conflict, data)
+ True
+ >>> policy.commitError(conflict, data)
+ True
+ >>> policy.commitError(conflict, data)
+ True
+ >>> policy.commitError(conflict, data)
+ False
+
+A ZEO ClientDisconnected error will be retried forever. We treat 50 as close
+enough to "forever". It uses time.sleep to backoff. We'll stub that so we can
+see what happens.
+
+ >>> import time
+ >>> sleep_requests = []
+ >>> def sleep(i):
+ ... sleep_requests.append(i)
...
- BadStatusError: can only call fail on a job with NEW, PENDING, ASSIGNED, or ACTIVE status
+ >>> old_sleep = time.sleep
+ >>> time.sleep = sleep
-Using ``resumeCallbacks``
--------------------------
+ >>> import ZEO.Exceptions
+ >>> disconnect = zc.twist.Failure(ZEO.Exceptions.ClientDisconnected())
-So ``fail`` is the proper way to handle an active job that was being
-worked on by on eof a dead dispatcher's worker thread, but how does one
-handle a job that was in the CALLBACKS status? The answer is to use
-resumeCallbacks. Any job that is still pending will be called; any
-job that is active will be failed; any job that is in the middle
-of calling its own callbacks will have its ``resumeCallbacks`` called; and
-any job that is completed will be ignored.
+ >>> for i in range(50):
+ ... if not policy.jobError(disconnect, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+ >>> sleep_requests # doctest: +ELLIPSIS
+ [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+ >>> len(sleep_requests)
+ 50
+After jobError has upped the backoff, it keeps at 60 seconds for
+ClientDisconnected errors at commit.
+
+ >>> policy.commitError(disconnect, data)
+ True
+ >>> sleep_requests[50]
+ 60
+
+Here's the backoff happening in commitError.
+
+ >>> del sleep_requests[:]
+ >>> data = {}
+ >>> for i in range(50):
+ ... if not policy.commitError(disconnect, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+ >>> sleep_requests # doctest: +ELLIPSIS
+ [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+ >>> len(sleep_requests)
+ 50
+
+If we encounter another kind of error, no retries are requested.
+
+ >>> runtime = zc.twist.Failure(RuntimeError())
+ >>> policy.jobError(runtime, data)
+ False
+ >>> policy.commitError(runtime, data)
+ False
+ >>> value = zc.twist.Failure(ValueError())
+ >>> policy.jobError(value, data)
+ False
+ >>> policy.commitError(value, data)
+ False
+
+When the system is interrupted, ``handleInterrupt`` uses the retry_policy's
+``interrupted`` method to determine what to do. It does not need to pass a
+data dictionary. The default policy will retry ten times, for a total of ten
+attempts.
+
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ True
+ >>> policy.interrupted()
+ False
+
+``updateData`` is only used after ``jobError`` and ``commitError``, and is
+called every time there is a commit attempt, so we're wildly out of order here,
+but we'll call the method now anyway to show we can, and then perform the job.
+
+ >>> transaction.commit()
+ >>> policy.updateData(data)
+ >>> j()
+ 10
+
+The policy may also want to perform additional logging.
+
+The other policies perform similarly. [#show_other_policies]_
+
+To change a policy on a job, you have two options. First, you can change the
+``retry_policy_factory`` on your instance.
+
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> j.retry_policy_factory = zc.async.job.NeverRetry
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.NeverRetry)
+ True
+
+Second, you can leverage the fact that the default policy tries to adapt the
+job to zc.async.interfaces.IRetryPolicy (with the default name of ''), and only
+if that fails does it use RetryCommonFourTimes.
+
+ >>> import zope.component
+ >>> zope.component.provideAdapter(
+ ... zc.async.job.RetryCommonForever,
+ ... provides=zc.async.interfaces.IRetryPolicy)
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonForever)
+ True
+
+ >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+ ... zc.async.job.RetryCommonForever,
+ ... provided=zc.async.interfaces.IRetryPolicy) # tearDown
+ True
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+ True
+
+If you are working with callbacks, the default retry policy is
+``RetryCommonForever``.
+
+ >>> def foo(result):
+ ... print result
+ ...
+ >>> callback = j.addCallback(foo)
+ >>> policy = callback.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonForever)
+ True
+
+This can be changed in the same kind of way as a non-callback job. You can
+set the ``retry_policy_factory``.
+
+ >>> callback = j.addCallback(foo)
+ >>> callback.retry_policy_factory = zc.async.job.NeverRetry
+ >>> policy = callback.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.NeverRetry)
+ True
+
+You can also register an adapter. In this case it must be an adapter
+registered with the 'callback' name.
+
+ >>> zope.component.provideAdapter(
+ ... zc.async.job.RetryCommonFourTimes,
+ ... provides=zc.async.interfaces.IRetryPolicy, name='callback')
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> callback = j.addCallback(foo)
+ >>> policy = callback.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonFourTimes)
+ True
+
+ >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+ ... zc.async.job.RetryCommonFourTimes,
+ ... provided=zc.async.interfaces.IRetryPolicy, name='callback') # tearDown
+ True
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> callback = j.addCallback(foo)
+ >>> policy = callback.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonForever)
+ True
+
+Finally, it is worth noting that, typically, once a retry policy has been
+obtained with ``getRetryPolicy``, changing the factory or adapter registration
+will not change the policy: it has already been instantiated and stored on the
+job.
+
+ >>> callback.retry_policy_factory = zc.async.job.NeverRetry
+ >>> isinstance(callback.getRetryPolicy(), zc.async.job.NeverRetry)
+ False
+ >>> policy is callback.getRetryPolicy()
+ True
+
+Now we'll look at the standard retry policies in use as we examine more failure
+scenarios below.
+
+---------------------
+Failures in Your Code
+---------------------
+
+As seen above, failures in your code will generally be ignored by the default
+retry policy, and passed on as a Failure object on the job's request. The
+only difference is for ConflictErrors (retry five times) and ClientDisconnected
+(retry forever). Let's manufacture some of these problems.
+
+First, a few "normal" errors. They just go to the result.
+
+ >>> count = 0
+ >>> max = 50
+ >>> def raiseAnError(klass):
+ ... global count
+ ... count += 1
+ ... if count < max:
+ ... raise klass()
+ ... else:
+ ... print 'tried %d times. stopping.' % (count,)
+ ... return 42
+ ...
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError, ValueError)
+ >>> transaction.commit()
+ >>> job()
+ <zc.twist.Failure exceptions.ValueError>
+ >>> job.result
+ <zc.twist.Failure exceptions.ValueError>
+ >>> count
+ 1
+ >>> count = 0
+
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+ >>> transaction.commit()
+ >>> job()
+ <zc.twist.Failure exceptions.TypeError>
+ >>> job.result
+ <zc.twist.Failure exceptions.TypeError>
+ >>> count
+ 1
+ >>> count = 0
+
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError, RuntimeError)
+ >>> transaction.commit()
+ >>> job()
+ <zc.twist.Failure exceptions.RuntimeError>
+ >>> job.result
+ <zc.twist.Failure exceptions.RuntimeError>
+ >>> count
+ 1
+ >>> count = 0
+
+If we raise a ConflictError (or any TransactionError), that will try five
+times, and then be let through, so it will look very similar to the previous
+examples except for our ``count`` variable.
+
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+ ... ZODB.POSException.ConflictError)
+ >>> transaction.commit()
+ >>> job()
+ <zc.twist.Failure ZODB.POSException.ConflictError>
+ >>> job.result
+ <zc.twist.Failure ZODB.POSException.ConflictError>
+ >>> count
+ 5
+ >>> count = 0
+
+It's worth showing that, as you'd expect, if the job succeeds within the
+allotted retries, everything is fine.
+
+ >>> max = 3
+
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+ ... ZODB.POSException.ConflictError)
+ >>> transaction.commit()
+ >>> job()
+ tried 3 times. stopping.
+ 42
+ >>> job.result
+ 42
+ >>> count
+ 3
+ >>> count = 0
+
+If we raise a ClientDisconnected error, that will retry forever, with a
+timeout. our job doesn't let this happen, but the retry policy would.
+
+ >>> max = 50
+ >>> del sleep_requests[:]
+ >>> job = root['j'] = zc.async.job.Job(raiseAnError,
+ ... ZEO.Exceptions.ClientDisconnected)
+ >>> transaction.commit()
+ >>> job()
+ tried 50 times. stopping.
+ 42
+ >>> count
+ 50
+ >>> sleep_requests # doctest: +ELLIPSIS
+ [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+ >>> len(sleep_requests)
+ 49
+ >>> count = 0
+
+None of the provided retry policies utilize this functionality, but the
+pertinent retry policy method, ``jobError`` can not only return ``True``
+(retry now) or ``False`` (do not retry) but a datetime.datetime or a
+datetime.timedelta. Datetimes and timedeltas indicate that the policy requests
+that the job be returned to the queue to be claimed again. Let's give an
+example of this.
+
+As implied by this description, this functionality only makes sense within
+a fuller zc.async context. We need a stub Agent as a job's parent, and a stub
+Queue in its ``parent`` lineage.
+
+ >>> class StubQueue(persistent.Persistent):
+ ... zope.interface.implements(zc.async.interfaces.IQueue)
+ ... def putBack(self, job):
+ ... self.put_back = job
+ ... job.parent = self
+ ... def put(self, job, begin_after=None):
+ ... self.put_job = job
+ ... self.put_begin_after = begin_after
+ ... job.parent = self
+ ...
+ >>> class StubAgent(persistent.Persistent):
+ ... zope.interface.implements(zc.async.interfaces.IAgent)
+ ... def __init__(self):
+ ... # usually would have dispatcher agent parent, which would then
+ ... # have queue as parent, but this is a stub
+ ... self.parent = StubQueue()
+ ... def jobCompleted(self, job):
+ ... self.completed = job
+ ... def remove(self, job):
+ ... job.parent = None
+ ... self.removed = job
+ ...
+
+We'll also need a quick stub retry policy that takes advantage of this
+functionality.
+
+ >>> import persistent
+ >>> import datetime
+ >>> class StubRescheduleRetryPolicy(persistent.Persistent):
+ ... zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+ ... _reply = datetime.timedelta(hours=1)
+ ... def __init__(self, job):
+ ... self.parent = self.__parent__ = job
+ ... def jobError(self, failure, data_cache):
+ ... return self._reply
+ ... def commitError(self, failure, data_cache):
+ ... return self._reply
+ ... def interrupted(self):
+ ... return self._reply
+ ... def updateData(self, data_cache):
+ ... pass
+ ...
+
+ >>> j = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+ >>> agent = j.parent = StubAgent()
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> transaction.commit()
+ >>> j is j()
+ True
+
+Notice above that, when this happens, the result of the call is the job itself.
+
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> agent.removed is j
+ True
+ >>> agent.parent.put_job is j
+ True
+ >>> agent.parent.put_begin_after
+ datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+ >>> import pytz
+ >>> j = root['j'] = zc.async.job.Job(raiseAnError, TypeError)
+ >>> agent = j.parent = StubAgent()
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+ >>> transaction.commit()
+ >>> j is j()
+ True
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> agent.removed is j
+ True
+ >>> agent.parent.put_job is j
+ True
+ >>> agent.parent.put_begin_after
+ datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+------------------
+Failures on Commit
+------------------
+
+Failures on commit are handled very similarly to those occurring while
+performing the job.
+
+We'll have to hack ``TransactionManager.commit`` to show this.
+
+ >>> from transaction import TransactionManager
+ >>> old_commit = TransactionManager.commit
+ >>> commit_count = 0
+ >>> error = None
+ >>> max = 2
+ >>> allow_commits = [1] # change state to active
+ >>> def new_commit(self):
+ ... global commit_count
+ ... commit_count += 1
+ ... if commit_count in allow_commits:
+ ... old_commit(self) # changing state to "active" or similar
+ ... elif commit_count - len(allow_commits) < max:
+ ... raise error()
+ ... else:
+ ... if commit_count - len(allow_commits) == max:
+ ... print 'tried %d time(s). committing.' % (
+ ... commit_count-len(allow_commits))
+ ... old_commit(self)
+ ...
+ >>> TransactionManager.commit = new_commit
+ >>> count = 0
+ >>> def count_calls():
+ ... global count
+ ... count += 1
+ ... return 42
+ ...
+
+Normal errors will simply pass through with only one try.
+
+ >>> error = ValueError
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 2 time(s). committing.
+ <zc.twist.Failure exceptions.ValueError>
+ >>> count
+ 1
+
+Note that, since the error at commit is obscuring the original result, the
+original result is logged.
+
+ >>> for r in reversed(event_logs.records):
+ ... if r.levelname == 'INFO':
+ ... break
+ ... else:
+ ... assert False, 'could not find log'
+ ...
+ >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Commit failed ... Prior to this, job succeeded with result: 42
+
+This happens both for non-error results (as above) and for error results.
+
+ >>> count = commit_count = 0
+ >>> allow_commits = [1, 2, 3] # set to active, then get retry policy, then call
+ >>> j = root['j'] = zc.async.job.Job(raiseAnError, RuntimeError)
+ >>> transaction.commit()
+ >>> j()
+ tried 2 time(s). committing.
+ <zc.twist.Failure exceptions.ValueError>
+ >>> found = []
+ >>> for r in reversed(event_logs.records):
+ ... if r.levelname == 'ERROR':
+ ... found.append(r)
+ ... if len(found) == 2:
+ ... break
+ ... else:
+ ... assert False, 'could not find log'
+ ...
+ >>> print found[1].getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Commit failed ... Prior to this, job failed with traceback:
+ ...
+ exceptions.RuntimeError...
+
+The ValueError, from transaction time, is the main error recorded.
+
+ >>> print found[0].getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ <...Job...raiseAnError... failed with traceback:
+ ...
+ exceptions.ValueError...
+
+The conflict error will be tried five times.
+
+ >>> max = 6
+ >>> count = commit_count = 0
+ >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13] # set active state, then get retries
+ >>> error = ZODB.POSException.ConflictError
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 6 time(s). committing.
+ <zc.twist.Failure ZODB.POSException.ConflictError>
+ >>> count
+ 5
+
+If it succeeds in the number of tries allotted, the result will be fine.
+
+ >>> max = 3
+ >>> count = commit_count = 0
+ >>> allow_commits = [1, 3, 4, 6, 7]
+ >>> error = ZODB.POSException.ConflictError
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 3 time(s). committing.
+ 42
+ >>> count
+ 3
+
+As an aside, the code regards getting the desired retry policy as a requirement
+and so will try until it does so. Here's a repeat of the previous example,
+with some retries getting a failed commit. Everything except a ConflictError
+has a backoff.
+
+ >>> max = 25
+ >>> count = commit_count = 0
+ >>> allow_commits = [1]
+ >>> error = ZODB.POSException.ConflictError
+ >>> del sleep_requests[:]
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 25 time(s). committing.
+ 42
+ >>> count
+ 2
+ >>> sleep_requests
+ []
+
+The ClientDisconnected will be tried forever until it succeeds, with a backoff.
+We're putting in some irregular patterns of commits to test that these also
+work, which skew some of the results
+
+ >>> max = 50
+ >>> count = commit_count = 0
+ >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 22, 25]
+ >>> del sleep_requests[:]
+ >>> error = ZEO.Exceptions.ClientDisconnected
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 50 time(s). committing.
+ 42
+ >>> count
+ 9
+ >>> len(sleep_requests)
+ 51
+
+None of the provided retry policies utilize this functionality, but the
+pertinent retry policy method, ``jobError`` can not only return ``True``
+(retry now) or ``False`` (do not retry) but a datetime.datetime or a
+datetime.timedelta. Datetimes and timedeltas indicate that the policy requests
+that the job be returned to the queue to be claimed again. Let's give an
+example of this.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> agent = j.parent = StubAgent()
+ >>> StubRescheduleRetryPolicy._reply = datetime.timedelta(hours=1)
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> count = commit_count = 0
+ >>> max = 1
+ >>> error = ValueError
+ >>> transaction.commit()
+ >>> j is j()
+ True
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> agent.removed is j
+ True
+ >>> agent.parent.put_job is j
+ True
+ >>> agent.parent.put_begin_after
+ datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> agent = j.parent = StubAgent()
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+ >>> count = commit_count = 0
+ >>> transaction.commit()
+ >>> j is j()
+ True
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> agent.removed is j
+ True
+ >>> agent.parent.put_job is j
+ True
+ >>> agent.parent.put_begin_after
+ datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+All of this looks very similar to job errors. The only big difference between
+the behavior of job errors and commit errors is that a failure of a failure to
+commit *must* commit: it will try forever until it succeeds or is interrupted.
+Here's an example with a ConflictError.
+
+ >>> count = commit_count = 0
+ >>> max = 50
+ >>> error = ZODB.POSException.ConflictError
+ >>> allow_commits = [1, 3, 4, 6, 7, 9, 10, 12, 13]
+ >>> j = root['j'] = zc.async.job.Job(count_calls)
+ >>> transaction.commit()
+ >>> j()
+ tried 50 time(s). committing.
+ <zc.twist.Failure ZODB.POSException.ConflictError>
+ >>> count
+ 5
+
+ >>> TransactionManager.commit = old_commit
+
+ >>> time.sleep = old_sleep # tearDown
+
+-------------------
+``handleInterrupt``
+-------------------
+
+Not infrequently, systems will be stopped while jobs are in the middle of their
+work. When the clean-up occurs, the job needs to figure out what to do to
+handle the interruption. The right thing to do is to call ``handleInterrupt``.
+This method itself defers to a retry policy to determine what to do.
+
+This method takes no arguments. The default behavior is to allow up to 10
+interruptions. It expects a parent agent to reschedule it in a queue. Let's
+give an example.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> queue = j.parent = StubQueue()
+ >>> j._status = zc.async.interfaces.ACTIVE
+ >>> transaction.commit()
+ >>> j.handleInterrupt()
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> queue.put_back is j
+ True
+
+After 9 interruptions, the default policy will give up after the tenth try.
+
+ >>> for i in range(8):
+ ... j.parent = agent
+ ... j._status = zc.async.interfaces.ACTIVE
+ ... j.handleInterrupt()
+ ... if j.status != zc.async.interfaces.PENDING:
+ ... print 'error', i, j.status
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+ >>> j.parent = agent
+ >>> j._status = zc.async.interfaces.ACTIVE
+ >>> j.handleInterrupt()
+ >>> j.status == zc.async.interfaces.COMPLETED
+ True
+ >>> j.result
+ <zc.twist.Failure zc.async.interfaces.AbortedError>
+
+If the retry policy returns True, as happens above, this should be interpreted
+by the agent and queue as a request to immediately reschedule the job. The
+policy can also return a datettime.timedelta or datetime.datetime to ask that
+it be rescheduled later in the future.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> queue = j.parent = StubQueue()
+ >>> StubRescheduleRetryPolicy._reply = datetime.timedelta(hours=1)
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> j._status = zc.async.interfaces.ACTIVE
+ >>> transaction.commit()
+ >>> j.handleInterrupt()
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> queue.put_job is j
+ True
+ >>> queue.put_begin_after
+ datetime.datetime(2006, 8, 10, 16, 44, 22, 211, tzinfo=<UTC>)
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> queue = j.parent = StubQueue()
+ >>> j.retry_policy_factory = StubRescheduleRetryPolicy
+ >>> StubRescheduleRetryPolicy._reply = datetime.datetime(3000, 1, 1)
+ >>> j._status = zc.async.interfaces.ACTIVE
+ >>> transaction.commit()
+ >>> j.handleInterrupt()
+ >>> j.status == zc.async.interfaces.PENDING
+ True
+ >>> queue.put_job is j
+ True
+ >>> queue.put_begin_after
+ datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+In a full zc.async context, rescheduling is more of a dance between the job,
+the agent, and the queue. See `catastrophes.txt` for more information.
+
+-------------------
+``resumeCallbacks``
+-------------------
+
+``handleInterrupt`` should be called when a job was working on its own code.
+But what happens if the system stops while a job was working on its callbacks?
+When the job is handled, the system should call ``resumeCallbacks``. The
+method will call any callback that is still pending, and not timed out because
+of ``begin_by``; it will call ``fail`` on any callback that is pending and
+timed out; it will call ``handleInterrupt`` on any callback that is active'; it
+will call ``resumeCallbacks`` on any callback that itself has the CALLBACKS
+status; and will ignore any job that is completed. As such, it encompasses
+all of the retry behavior discussed above. Moreover, while it does not use
+retry policies directly, indirectly they are often used.
+
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
>>> j._result = 10
>>> j._status = zc.async.interfaces.CALLBACKS
>>> completed_j = zc.async.job.Job(multiply, 3)
@@ -517,11 +1369,8 @@
80
>>> sub_callbacks_j.status == zc.async.interfaces.COMPLETED
True
- >>> print active_j.result.getTraceback()
- ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- Traceback (most recent call last):
- ...
- zc.async.interfaces.AbortedError:
+ >>> active_j.result # was retried because of default retry policy
+ 50
>>> active_j.status == zc.async.interfaces.COMPLETED
True
>>> pending_j.result
@@ -529,8 +1378,12 @@
>>> pending_j.status == zc.async.interfaces.COMPLETED
True
-[#aborted_active_callback_log]_
+[#retried_active_callback_log]_
+Introspection
+=============
+
+------------------------------------
Introspecting and Mutating Arguments
------------------------------------
@@ -567,6 +1420,7 @@
>>> res = j() # doctest: +ELLIPSIS
<zc.async.job.Job (oid ...>
+-----------------
Result and Status
-----------------
@@ -673,6 +1527,7 @@
- Also, a job's direct callback may not call the job
[#callback_self]_.
+----------------------
More Job Introspection
----------------------
@@ -769,6 +1624,8 @@
True
>>> transaction.abort()
+ >>> time.sleep = old_sleep # probably put in test tearDown
+
=========
Footnotes
=========
@@ -783,15 +1640,17 @@
>>> root = conn.root()
>>> import zc.async.configure
>>> zc.async.configure.base()
+ >>> import zc.async.testing
+ >>> zc.async.testing.setUpDatetime() # pins datetimes
.. [#verify] Verify interface
>>> from zope.interface.verify import verifyObject
>>> verifyObject(zc.async.interfaces.IJob, j)
True
-
+
Note that status and result are readonly.
-
+
>>> j.status = 1
Traceback (most recent call last):
...
@@ -804,9 +1663,9 @@
.. [#no_live_frames] Failures have two particularly dangerous bits: the
traceback and the stack. We use the __getstate__ code on Failures
to clean them up. This makes the traceback (``tb``) None...
-
+
>>> j.result.tb # None
-
+
...and it makes all of the values in the stack--the locals and
globals-- into strings. The stack is a list of lists, in which each
internal list represents a frame, and contains five elements: the
@@ -814,14 +1673,14 @@
the line number (``f_lineno``), an items list of the locals, and an
items list for the globals. All of the values in the items list
would normally be objects, but are now strings.
-
+
>>> for (codename, filename, lineno, local_i, global_i) in j.result.stack:
... for k, v in local_i:
... assert isinstance(v, basestring), 'bad local %s' % (v,)
... for k, v in global_i:
... assert isinstance(v, basestring), 'bad global %s' % (v,)
...
-
+
Here's a reasonable question. The Twisted Failure code has a
__getstate__ that cleans up the failure, and that's even what we are
using to sanitize the failure. If the failure is attached to a
@@ -845,7 +1704,7 @@
desired changes to the transaction (such as aborting) before the
job calls commit. Compare. Here is a call that raises an
exception, and rolls back changes.
-
+
(Note that we are passing arguments to the job, a topic that has
not yet been discussed in the text when this footnote is given: read
on a bit in the main text to see the details, if it seems surprising
@@ -942,14 +1801,16 @@
... assert False, 'could not find log'
...
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- <zc.async.job.Job (oid 114, db 'unnamed')
+ <zc.async.job.Job (oid ..., db 'unnamed')
``zc.async.job.completeStartedJobArguments(zc.async.job.Job
- (oid 109, db 'unnamed'))``>
+ (oid ..., db 'unnamed'))``>
succeeded with result:
None
- However, the callbacks that fail themselves are logged (again, as all jobs
- are) at the "error" level and include a detailed traceback.
+ However, the callbacks that fail themselves are logged (by default at a
+ "CRITICAL" level because callbacks are often in charge of error handling,
+ and having an error in your error handler may be a serious problem) and
+ include a detailed traceback.
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 4)
>>> transaction.commit()
@@ -957,8 +1818,8 @@
>>> j() # doctest: +ELLIPSIS
20
- >>> for r in reversed(trace_logs.records):
- ... if r.levelname == 'ERROR':
+ >>> for r in reversed(event_logs.records):
+ ... if r.levelname == 'CRITICAL':
... break
... else:
... assert False, 'could not find log'
@@ -966,29 +1827,204 @@
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<...Job...``zc.async.doctest_test.multiply()``> failed with traceback:
*--- Failure #... (pickled) ---
- .../zc/async/job.py:...
+ ...job.py:...
[ Locals...
- res : 'None...
- ( Globals...
- .../zc/async/job.py:...
- [ Locals...
effective_args : '[20]...
( Globals...
exceptions.TypeError: multiply() takes at least 2 arguments (1 given)
*--- End of Failure #... ---
<BLANKLINE>
-.. [#aborted_active_callback_log] The fact that the pseudo-aborted "active"
- job failed is logged.
+.. [#show_other_policies]
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> j.retry_policy_factory = zc.async.job.RetryCommonForever
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.RetryCommonForever)
+ True
+
+ Here's the policy requesting that a job be tried "forever" (we show 50
+ times), both in the job and in the commit.
+
+ >>> import zc.twist
+ >>> import ZODB.POSException
+ >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+ >>> data = {}
+
+ >>> for i in range(50):
+ ... if not policy.jobError(conflict, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+
+ >>> for i in range(50):
+ ... if not policy.commitError(conflict, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+
+ A ZEO ClientDisconnected error will also be retried forever, but with a
+ backoff.
+
+ >>> import ZEO.Exceptions
+ >>> disconnect = zc.twist.Failure(ZEO.Exceptions.ClientDisconnected())
+ >>> del sleep_requests[:]
+
+ >>> for i in range(50):
+ ... if not policy.jobError(disconnect, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+ >>> sleep_requests # doctest: +ELLIPSIS
+ [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+ >>> len(sleep_requests)
+ 50
+ >>> del sleep_requests[:]
+
+ >>> data = {}
+ >>> for i in range(50):
+ ... if not policy.commitError(disconnect, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+ >>> sleep_requests # doctest: +ELLIPSIS
+ [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, ..., 60]
+ >>> len(sleep_requests)
+ 50
+
+ If we encounter another kind of error during the job, no retries are
+ requested.
+
+ >>> runtime = zc.twist.Failure(RuntimeError())
+ >>> policy.jobError(runtime, data)
+ False
+ >>> value = zc.twist.Failure(ValueError())
+ >>> policy.jobError(value, data)
+ False
+
+ However, during the commit, any kind of error will cause a retry, forever.
+
+ >>> for i in range(50):
+ ... if not policy.commitError(runtime, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+
+ >>> for i in range(50):
+ ... if not policy.commitError(value, data):
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+
+ When the system is interrupted, ``handleInterrupt`` uses the retry_policy's
+ ``interrupted`` method to determine what to do. It does not need to pass a
+ data dictionary. This also happens forever.
+
+ >>> for i in range(50):
+ ... if not policy.interrupted():
+ ... print 'error'
+ ... break
+ ... else:
+ ... print 'success'
+ ...
+ success
+
+ ``updateData`` is only used after ``jobError`` and ``commitError``, and is
+ called every time there is a commit attempt, so we're wildly out of order here,
+ but we'll call the method now anyway to show we can, and then perform the job.
+
+ >>> transaction.commit()
+ >>> policy.updateData(data)
+ >>> j()
+ 10
+
+Now we will show ``NeverRetry``. As the name implies, this is a simple policy
+for jobs that should never retry. Typical reasons for this are that the job is
+talking to an external system or doing something else that is effectively not
+transactional. More sophisticated policies for specific versions of this
+scenario may be possible; however, also consider callbacks for this usecase.
+
+ >>> j = root['j'] = zc.async.job.Job(multiply, 5, 2)
+ >>> j.retry_policy_factory = zc.async.job.NeverRetry
+ >>> policy = j.getRetryPolicy()
+ >>> isinstance(policy, zc.async.job.NeverRetry)
+ True
+
+ So, here's a bunch of "no"s.
+
+ >>> import zc.twist
+ >>> import ZODB.POSException
+ >>> conflict = zc.twist.Failure(ZODB.POSException.ConflictError())
+ >>> data = {}
+
+ >>> policy.jobError(conflict, data)
+ False
+
+ >>> policy.commitError(conflict, data)
+ False
+
+ >>> policy.jobError(disconnect, data)
+ False
+
+ >>> policy.commitError(disconnect, data)
+ False
+
+ >>> runtime = zc.twist.Failure(RuntimeError())
+ >>> policy.jobError(runtime, data)
+ False
+
+ >>> policy.commitError(runtime, data)
+ False
+
+ >>> value = zc.twist.Failure(ValueError())
+ >>> policy.jobError(value, data)
+ False
+
+ >>> policy.commitError(value, data)
+ False
+
+ >>> policy.interrupted()
+ False
+
+ ``updateData`` is only used after ``jobError`` and ``commitError``, and is
+ called every time there is a commit attempt, so we're wildly out of order here,
+ but we'll call the method now anyway to show we can, and then perform the job.
+
+ >>> transaction.commit()
+ >>> policy.updateData(data)
+ >>> j()
+ 10
+
+.. [#retried_active_callback_log] The fact that the pseudo-aborted "active"
+ job was retried is logged.
+
>>> for r in reversed(trace_logs.records):
- ... if r.levelname == 'DEBUG' and r.getMessage().startswith('failing'):
+ ... if (r.levelname == 'DEBUG' and
+ ... r.getMessage().startswith('retrying interrupted')):
... break
... else:
... assert False, 'could not find log'
...
>>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- failing aborted callback
+ retrying interrupted callback
<...Job...``zc.async.doctest_test.multiply(5)``> to <...Job...>
.. [#call_self] Here's a job trying to call itself.
Modified: zc.async/trunk/src/zc/async/queue.py
===================================================================
--- zc.async/trunk/src/zc/async/queue.py 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/queue.py 2008-06-20 01:18:18 UTC (rev 87584)
@@ -13,6 +13,7 @@
##############################################################################
import datetime
import bisect
+import logging
import pytz
import persistent
import persistent.interfaces
@@ -23,6 +24,7 @@
import zope.component
import zope.event
import zope.bforest
+import zope.minmax
import zc.queue
import zc.dict
@@ -43,22 +45,20 @@
UUID = None
activated = None
-
+
def __init__(self, uuid):
super(DispatcherAgents, self).__init__()
self.UUID = uuid
- self.__class__.last_ping.initialize(self)
-
- zc.async.utils.createAtom('last_ping', None)
-
+ self.last_ping = zope.minmax.Maximum()
+
ping_interval = datetime.timedelta(seconds=30)
ping_death_interval = datetime.timedelta(seconds=60)
@property
def dead(self):
- last_ping = self.last_ping
+ last_ping = self.last_ping.value
if self.activated and (
- self.last_ping is None or self.activated > self.last_ping):
+ last_ping is None or self.activated > last_ping):
last_ping = self.activated
elif last_ping is None:
return False
@@ -103,27 +103,43 @@
else:
while job is not None:
status = job.status
- if status == zc.async.interfaces.ASSIGNED:
+ if status in (zc.async.interfaces.PENDING,
+ zc.async.interfaces.ASSIGNED):
+ # odd
+ zc.async.log.warning(
+ 'unexpected job status %s for %r; treating as NEW',
+ status, job)
+ status = zc.async.interfaces.NEW
+ if status == zc.async.interfaces.NEW:
tmp = job.assignerUUID
job.assignerUUID = None
job.parent = None
queue.put(job)
job.assignerUUID = tmp
elif job.status == zc.async.interfaces.ACTIVE:
- queue.put(job.fail)
+ j = queue.put(
+ job.handleInterrupt,
+ retry_policy_factory=zc.async.job.RetryCommonForever,
+ failure_log_level=logging.CRITICAL)
elif job.status == zc.async.interfaces.CALLBACKS:
- queue.put(job.resumeCallbacks)
+ j = queue.put(
+ job.resumeCallbacks,
+ retry_policy_factory=zc.async.job.RetryCommonForever,
+ failure_log_level=logging.CRITICAL)
elif job.status == zc.async.interfaces.COMPLETED:
# huh, that's odd.
agent.completed.add(job)
+ zc.async.utils.log.warning(
+ 'unexpectedly had to inform agent of completion '
+ 'of %r', job)
try:
job = agent.pull()
except IndexError:
job = None
zope.event.notify(
zc.async.interfaces.DispatcherDeactivated(self))
-
+
class Queues(zc.async.utils.Dict):
def __setitem__(self, key, value):
@@ -161,9 +177,10 @@
if not da.activated:
raise ValueError('UUID is not activated.')
now = datetime.datetime.now(pytz.UTC)
- if (da.last_ping is None or
- da.last_ping + da.ping_interval <= now):
- da.last_ping = now
+ last_ping = da.last_ping.value
+ if (last_ping is None or
+ last_ping + da.ping_interval <= now):
+ da.last_ping.value = now
next = self._getNextActiveSibling(uuid)
if next is not None and next.dead:
# `next` seems to be a dead dispatcher.
@@ -188,28 +205,37 @@
self.size = size
def clean(self):
+ now = datetime.datetime.now(pytz.UTC)
for i, job in enumerate(reversed(self._data)):
- if job.status in (
- zc.async.interfaces.CALLBACKS,
- zc.async.interfaces.COMPLETED):
+ status = job.status
+ if status in (zc.async.interfaces.CALLBACKS,
+ zc.async.interfaces.COMPLETED) or (
+ status == zc.async.interfaces.PENDING and
+ job.begin_after > now): # for a rescheduled task
self._data.pull(-1-i)
@property
def filled(self):
return len(self._data) >= self.size
+ def __contains__(self, item):
+ for i in self:
+ if i is item:
+ return True
+ return False
+
def add(self, item):
+ if item in self:
+ return
if not zc.async.interfaces.IJob.providedBy(item):
raise ValueError('must be IJob')
if self.name not in item.quota_names:
raise ValueError('quota name must be in quota_names')
- # self.clean()
if self.filled:
raise ValueError('Quota is filled')
self._data.put(item)
- for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__', 'get',
- '__contains__'):
+ for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__', 'get'):
locals()[nm] = zc.async.utils.simpleWrapper(nm)
@@ -229,6 +255,8 @@
class Queue(zc.async.utils.Base):
zope.interface.implements(zc.async.interfaces.IQueue)
+ _putback_queue = None
+
def __init__(self):
self._queue = zc.queue.CompositeQueue()
self._held = BTrees.OOBTree.OOBTree()
@@ -238,9 +266,14 @@
self.dispatchers = Dispatchers()
self.dispatchers.__parent__ = self
- def put(self, item, begin_after=None, begin_by=None):
+ def put(self, item, begin_after=None, begin_by=None,
+ failure_log_level=None, retry_policy_factory=None):
item = zc.async.interfaces.IJob(item)
- if item.assignerUUID is not None:
+ if failure_log_level is not None:
+ item.failure_log_level = failure_log_level
+ if retry_policy_factory is not None:
+ item.retry_policy_factory = retry_policy_factory
+ if item.status != zc.async.interfaces.NEW:
raise ValueError(
'cannot add already-assigned job')
for name in item.quota_names:
@@ -253,10 +286,9 @@
item.begin_after = now
if begin_by is not None:
item.begin_by = begin_by
- elif item.begin_by is None:
- item.begin_by = datetime.timedelta(hours=1) # good idea?
- item.assignerUUID = zope.component.getUtility(
- zc.async.interfaces.IUUID)
+ if item.assignerUUID is not None: # rescheduled job keeps old UUID
+ item.assignerUUID = zope.component.getUtility(
+ zc.async.interfaces.IUUID)
if item._p_jar is None:
# we need to do this if the job will be stored in another
# database as well during this transaction. Also, _held storage
@@ -274,7 +306,35 @@
self._length.change(1)
return item
+ def putBack(self, item):
+ # an agent has claimed a job, but now the job needs to be returned. the
+ # only current caller for this is a job's ``handleInterrupt`` method.
+ # The scenario for this is that the agent's dispatcher died while the
+ # job was active, interrupting the work; and the job's retry policy
+ # asks that the job be put back on the queue to be claimed immediately.
+ # This method puts the job in a special internal queue that ``_iter``
+ # looks at first. This allows jobs to maintain their order, if needed,
+ # within a quota.
+ assert zc.async.interfaces.IJob.providedBy(item)
+ assert item.status == zc.async.interfaces.NEW, item.status
+ assert item.begin_after is not None
+ assert item._p_jar is not None
+ # to support legacy instances of the queue that were created before
+ # this functionality and its separate internal data structure were
+ # part of the code, we instantiate the _putback_queue when we first
+ # need it, here.
+ if self._putback_queue is None:
+ self._putback_queue = zc.queue.CompositeQueue()
+ self._putback_queue.put(item)
+ item.parent = self
+ self._length.change(1)
+
def _iter(self):
+ putback_queue = self._putback_queue
+ if putback_queue: # not None and not empty
+ dq_pop = putback_queue.pull
+ for dq_ix, dq_next in enumerate(putback_queue):
+ yield dq_pop, dq_ix, dq_next
queue = self._queue
tree = self._held
q = enumerate(queue)
@@ -331,16 +391,17 @@
if not self._length():
return default
uuid = None
+ quotas_cleaned = set()
for pop, ix, job in self._iter():
if job.begin_after > now:
break
res = None
quotas = []
- if (job.begin_after + job.begin_by) < now:
- res = zc.async.interfaces.IJob(
- job.fail) # specify TimeoutError?
+ if (job.begin_by is not None and
+ (job.begin_after + job.begin_by) < now):
+ res = zc.async.interfaces.IJob(job.fail)
+ res.args.append(zc.async.interfaces.TimeoutError())
res.begin_after = now
- res.begin_by = datetime.timedelta(hours=1)
res.parent = self
if uuid is None:
uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
@@ -349,8 +410,10 @@
for name in job.quota_names:
quota = self.quotas.get(name)
if quota is not None:
- quota.clean()
- if quota.filled:
+ if name not in quotas_cleaned:
+ quota.clean()
+ quotas_cleaned.add(name)
+ if quota.filled and job not in quota:
break
quotas.append(quota)
else:
Modified: zc.async/trunk/src/zc/async/queue.txt
===================================================================
--- zc.async/trunk/src/zc/async/queue.txt 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/queue.txt 2008-06-20 01:18:18 UTC (rev 87584)
@@ -24,7 +24,7 @@
[#queues_collection]_ As shown in the README.txt of this package (or see
zc.async.adapters.defaultQueueAdapter), the queue with the name '' will
typically be registered as an adapter to persistent objects that
-provides zc.async.interfaces.IQueue [#verify]_.
+provides zc.async.interfaces.IQueue [#verify]_.
The queue doesn't have any jobs yet.
@@ -82,25 +82,27 @@
>>> job.parent is queue
True
>>> transaction.commit()
-
+
A job added without any special calls gets a `begin_after` attribute
of now.
>>> import datetime
>>> import pytz
- >>> now = datetime.datetime.now(pytz.UTC)
+ >>> now = datetime.datetime.now(pytz.UTC)
>>> now
datetime.datetime(2006, 8, 10, 15, 44, 22, 211, tzinfo=<UTC>)
>>> job.begin_after == now
True
-A ``begin_by`` attribute is a duration, and defaults to one hour. This
-means that it must be completed an hour after the ``begin_after`` datetime,
-or else the system will fail it.
+A ``begin_by`` attribute is a duration (datetime.timedelta) or None, and
+defaults to None.
- >>> job.begin_by == datetime.timedelta(hours=1)
+ >>> job.begin_by == None
True
+If ``begin_by`` were an hour, that would mean that it must be completed an hour
+after the ``begin_after`` datetime, or else the system will fail it.
+
Now let's add a job to be performed later, using ``begin_after``.
This means that it's immediately ready to be performed: we can ``claim`` it.
@@ -240,9 +242,9 @@
- If begin_after + begin_by >= now, a job that makes the original job fail
is used instead.
-- If a job has one or more ``quota_names`` and the associated quotas are
- filled with jobs not in the CALLBACKS or COMPLETED status then it will
- not be returned.
+- If a job is not failing and has one or more ``quota_names`` and the
+ associated quotas are filled with jobs not in the CALLBACKS or COMPLETED
+ status then it will not be returned.
- It does not take an index argument.
@@ -391,22 +393,46 @@
... @property
... def queue(self):
... return self.parent.parent
- ... def claimJob(self):
+ ... def claimJob(self, filter=None):
... if len(self) < self.size:
- ... job = self.queue.claim()
+ ... job = self.queue.claim(filter=filter)
... if job is not None:
... job.parent = self
... self.append(job)
... return job
- ... def pull(self):
- ... return self.pop(0)
+ ... def pull(self, index=0):
+ ... res = self.pop(index)
+ ... res.parent = None
+ ... return res
+ ... def remove(self, item):
+ ... self.pull(self.index(item))
... def jobCompleted(self, job):
+ ... persistent.list.PersistentList.remove(self, job)
... self.completed.add(job)
- ...
+ ... def reschedule(self, item, when):
+ ... now = datetime.datetime.utcnow()
+ ... if isinstance(when, datetime.datetime):
+ ... self.remove(job)
+ ... if when.tzinfo is not None:
+ ... when = when.astimezone(pytz.UTC).replace(tzinfo=None)
+ ... if when <= now:
+ ... self.queue.disclaim(item)
+ ... else:
+ ... self.queue.put(item, begin_after=when)
+ ... elif isinstance(when, datetime.timedelta):
+ ... self.remove(job)
+ ... if when <= datetime.timedelta():
+ ... self.queue.disclaim(item)
+ ... else:
+ ... self.queue.put(item, begin_after=now+when)
+ ... else:
+ ... raise TypeError('``when`` must be datetime or timedelta')
+ ...
>>> job4 is queue.claim()
True
>>> job4.parent = StubAgent()
+ >>> job4.parent.append(job4)
>>> job4.status == zc.async.interfaces.ASSIGNED
True
>>> list(quota) == [job4]
@@ -426,23 +452,25 @@
timed out are returned in wrappers that fail the original job.
>>> job9_from_outer_space = queue.put(mock_work)
+ >>> job9_from_outer_space.begin_by = datetime.timedelta(hours=1)
>>> zc.async.testing.set_now(
... datetime.datetime.now(pytz.UTC) +
... job9_from_outer_space.begin_by + datetime.timedelta(seconds=1))
>>> job9 = queue.claim()
>>> job9 is job9_from_outer_space
False
- >>> stub = root['stub'] = StubAgent()
+ >>> stub = root['stub'] = StubAgent()
>>> job9.parent = stub
+ >>> stub.append(job9)
>>> transaction.commit()
>>> job9()
>>> job9_from_outer_space.status == zc.async.interfaces.COMPLETED
True
>>> print job9_from_outer_space.result.getTraceback()
+ ... # doctest: +NORMALIZE_WHITESPACE
Traceback (most recent call last):
- Failure: zc.async.interfaces.AbortedError:
+ Failure: zc.async.interfaces.TimeoutError:
<BLANKLINE>
-
Dispatchers
===========
@@ -492,12 +520,12 @@
>>> print da.activated
None
- >>> print da.last_ping
+ >>> print da.last_ping.value
None
-When the object's ``last_ping`` + ``ping_interval`` is greater than now,
-a new ``last_ping`` should be recorded, as we'll see below. If the
-``last_ping`` (or ``activated``, if more recent) +
+When the object's ``last_ping.value`` + ``ping_interval`` is greater than now,
+a new ``last_ping.value`` should be recorded, as we'll see below. If the
+``last_ping.value`` (or ``activated``, if more recent) +
``ping_death_interval`` is older than now, the dispatcher is considered to
be ``dead``.
@@ -514,7 +542,7 @@
>>> import pytz
>>> now = datetime.datetime.now(pytz.UTC)
>>> da.activate()
- >>> now <= da.activated <= datetime.datetime.now(pytz.UTC)
+ >>> now <= da.activated <= datetime.datetime.now(pytz.UTC)
True
It's still not dead. :-)
@@ -638,10 +666,10 @@
>>> before = datetime.datetime.now(pytz.UTC)
>>> pollStub(conn)
- >>> before <= da.last_ping <= datetime.datetime.now(pytz.UTC)
+ >>> before <= da.last_ping.value <= datetime.datetime.now(pytz.UTC)
True
-We don't have any jobs to claim yet. Let's add one and do it again.
+We don't have any jobs to claim yet. Let's add one and do it again.
We'll use a test fixture, time_flies, to make the time change.
>>> job10 = queue.put(mock_work)
@@ -652,10 +680,10 @@
... datetime.timedelta(seconds=seconds))
...
- >>> last_ping = da.last_ping
+ >>> last_ping = da.last_ping.value
>>> time_flies(5)
>>> pollStub(conn)
- >>> da.last_ping == last_ping
+ >>> da.last_ping.value == last_ping
True
>>> len(jobs_to_do)
@@ -669,11 +697,11 @@
>>> time_flies(10)
>>> pollStub(conn)
- >>> da.last_ping == last_ping
+ >>> da.last_ping.value == last_ping
True
>>> time_flies(15)
>>> pollStub(conn)
- >>> da.last_ping > last_ping
+ >>> da.last_ping.value > last_ping
True
Dead Dispatchers
@@ -684,7 +712,7 @@
then proceed. But what if a queue has more than one simultaneous
dispatcher? How do we know to clean out the dead dispatcher's jobs?
-The ``ping`` method not only changes the ``last_ping`` but checks the
+The ``ping`` method not only changes the ``last_ping.value`` but checks the
next sibling dispatcher, as defined by UUID, to make sure that it is not
dead. It uses the ``dead`` attribute, introduced above, to test whether
the sibling is alive.
@@ -708,8 +736,8 @@
False
Let's do that again, with an agent in the new dispatcher and some jobs in the
-agent. Assigned jobs will be reassigned; in-progress jobs will have a
-new task that fails them; callback jobs will resume their callback; and
+agent. Assigned jobs will be reassigned; in-progress jobs will have a new task
+that calls ``handleInterrupt``; callback jobs will resume their callback; and
completed jobs will be moved to the completed collection.
>>> alt_agent = alt_da['main'] = StubAgent()
@@ -755,16 +783,385 @@
4
>>> queue[1] is jobA
True
- >>> queue[2].callable == jobB.fail
+ >>> queue[2].callable == jobB.handleInterrupt
True
>>> queue[3].callable == jobC.resumeCallbacks
True
>>> alt_agent.completed.first() is jobD
True
+As an aside, notice that the ``handleInterrupt`` and ``resumeCallbacks`` jobs
+have custom error log levels, and custom retry policies.
+
+ >>> import logging
+ >>> queue[2].failure_log_level == logging.CRITICAL
+ True
+ >>> queue[3].failure_log_level == logging.CRITICAL
+ True
+ >>> queue[2].retry_policy_factory is zc.async.job.RetryCommonForever
+ True
+ >>> queue[3].retry_policy_factory is zc.async.job.RetryCommonForever
+ True
+
If you have multiple workers, it is strongly suggested that you get the
associated servers connected to a shared time server.
+Rescheduled Jobs
+----------------
+
+When a job is interrupted--it was active when it stopped--it asks its retry
+policy whether to retry. The policy can respond in one of three ways:
+
+- yes, please retry right now;
+
+- yes, please retry later; or
+
+- no, please do not retry, and fail instead.
+
+All three of these possibilities potentially affect the queue in different
+ways, so we will look at examples.
+
+First, let's look at the simple cases for all three.
+
+The scenario above lets us look at what is probably the most common case: the
+default retry policy. Let's claim the ``handleInterrupt`` task.
+
+ >>> len(queue)
+ 4
+ >>> j = alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'handleInterrupt')
+ ...
+ >>> j.callable == jobB.handleInterrupt
+ True
+ >>> len(queue)
+ 3
+
+The default retry policy's behavior is to retry nine times, for a total of ten
+attempts, and to request that the original job be retried as soon as
+possible--first in line. That behavior will become more interesting when we
+look at interaction with quotas below. But now, performing this job will push
+jobB back in the queue--again, notably, in very first place in the queue.
+
+ >>> j()
+ >>> len(queue)
+ 4
+ >>> queue[0] is jobB
+ True
+ >>> jobB is alt_agent.claimJob()
+ True
+ >>> jobB()
+ 42
+ >>> len(queue)
+ 3
+
+That was an example of the retry policy saying "please retry right now":
+returning ``True``. Now let's look at the other two possible responses.
+
+"please retry later": with a datetime.
+
+ >>> import persistent
+ >>> import datetime
+ >>> class StubRetryPolicy(persistent.Persistent):
+ ... zope.interface.implements(zc.async.interfaces.IRetryPolicy)
+ ... _reply = datetime.datetime(3000, 1, 1)
+ ... def __init__(self, job):
+ ... self.parent = self.__parent__ = job
+ ... def jobError(self, failure, data_cache):
+ ... return self._reply
+ ... def commitError(self, failure, data_cache):
+ ... return self._reply
+ ... def interrupted(self):
+ ... return self._reply
+ ... def updateData(self, data_cache):
+ ... pass
+ ...
+
+ >>> j = zc.async.job.Job(mock_work)
+ >>> j._status = zc.async.interfaces.ACTIVE # hack internals for test
+ >>> j.retry_policy_factory = StubRetryPolicy
+ >>> j_wrap = queue.put(j.handleInterrupt)
+ >>> len(queue)
+ 4
+ >>> j_wrap is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'handleInterrupt')
+ ...
+ True
+ >>> len(queue)
+ 3
+ >>> j_wrap()
+ >>> len(queue)
+ 4
+ >>> queue[3] is j
+ True
+ >>> j.begin_after
+ datetime.datetime(3000, 1, 1, 0, 0, tzinfo=<UTC>)
+
+...with a timedelta.
+
+ >>> j = zc.async.job.Job(mock_work)
+ >>> j._status = zc.async.interfaces.ACTIVE # hack internals for test
+ >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+ >>> j.retry_policy_factory = StubRetryPolicy
+ >>> j_wrap = queue.put(j.handleInterrupt)
+ >>> len(queue)
+ 5
+ >>> j_wrap is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'handleInterrupt')
+ ...
+ True
+ >>> len(queue)
+ 4
+ >>> j_wrap()
+ >>> len(queue)
+ 5
+ >>> queue[3] is j
+ True
+ >>> j.begin_after
+ datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+"please do not retry": does not affect the queue, so we will not show it here.
+
+The trickiest aspects occur with quotas.
+
+"please retry now": with a quota. In this case, the job should retain its
+precise place in the quota if the quota has a limit of one, and should remain
+in the quota if the quota has a greater limit.
+
+ >>> quota = queue.quotas['content catalog']
+ >>> quota[0]()
+ 42
+ >>> quota.clean()
+ >>> len(quota)
+ 0
+ >>> j = queue.put(mock_work)
+ >>> j.quota_names = ('content catalog',)
+ >>> j2 = queue.put(mock_work)
+ >>> j2.quota_names = ('content catalog',)
+ >>> transaction.commit()
+ >>> len(queue)
+ 7
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... 'content catalog' in candidate.quota_names)
+ ...
+ True
+ >>> list(quota) == [j]
+ True
+
+ >>> j._status = zc.async.interfaces.ACTIVE # fake beginning to call
+ >>> alt_agent.remove(j)
+ >>> j_wrap = queue.put(j.handleInterrupt)
+
+The ``j`` job still claims the space in the quota, so ``j2`` can't be
+claimed.
+
+ >>> print alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... 'content catalog' in candidate.quota_names)
+ ...
+ None
+
+Now we'll get the ``handleInterrupt`` task. The ``j`` job still cannot be
+removed from the quota.
+
+ >>> j_wrap is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'handleInterrupt')
+ ...
+ True
+ >>> len(quota)
+ 1
+ >>> quota.clean()
+ >>> list(quota) == [j]
+ True
+
+Now we'll perform the task. The ``j`` job returns to the queue in a
+``PENDING`` status, and the quota still cannot be cleared.
+
+ >>> j_wrap()
+ >>> len(queue)
+ 7
+ >>> queue[0] is j
+ True
+ >>> len(quota)
+ 1
+ >>> quota.clean()
+ >>> list(quota) == [j]
+ True
+
+The agent can claim job ``j`` again, however.
+
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... 'content catalog' in candidate.quota_names)
+ ...
+ True
+
+Still cannot clear the quota.
+
+ >>> len(quota)
+ 1
+ >>> quota.clean()
+ >>> list(quota) == [j]
+ True
+
+Once we perform j, the quota can be cleared, and j2 can be claimed, as usual.
+
+ >>> j()
+ 42
+
+ >>> j2 is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... 'content catalog' in candidate.quota_names)
+ ...
+ True
+
+"please retry later": with a quota. In this case, other jobs can take the
+place of the original job in the quota.
+
+ >>> j3 = queue.put(mock_work)
+ >>> j3.quota_names = ('content catalog',)
+ >>> transaction.commit()
+ >>> j2.retry_policy_factory = StubRetryPolicy
+ >>> list(quota) == [j2]
+ True
+
+ >>> j2._status = zc.async.interfaces.ACTIVE # fake beginning to call
+ >>> alt_agent.remove(j2)
+ >>> j2_wrap = queue.put(j2.handleInterrupt)
+ >>> j2_wrap is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'handleInterrupt')
+ ...
+ True
+ >>> len(queue)
+ 6
+ >>> j2_wrap()
+ >>> len(queue)
+ 7
+ >>> queue[-2] is j2
+ True
+ >>> j2.status == zc.async.interfaces.PENDING
+ True
+
+ >>> j3 is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... 'content catalog' in candidate.quota_names)
+ ...
+ True
+ >>> j3()
+ 42
+
+A retry policy can also request resceduling from a jobError or a commitError,
+although the default retry policies do not.
+
+jobError: "please reschedule now"
+
+ >>> def raiseAnError():
+ ... raise RuntimeError
+ ...
+ >>> StubRetryPolicy._reply = datetime.timedelta()
+ >>> j = queue.put(raiseAnError, retry_policy_factory=StubRetryPolicy)
+ >>> len(queue)
+ 7
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'raiseAnError')
+ ...
+ True
+ >>> len(queue)
+ 6
+ >>> j() is j
+ True
+ >>> len(queue)
+ 7
+ >>> queue.claim() is j
+ True
+
+jobError: "please reschedule later"
+
+ >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+ >>> j = queue.put(raiseAnError, retry_policy_factory=StubRetryPolicy)
+ >>> len(queue)
+ 7
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'raiseAnError')
+ ...
+ True
+ >>> len(queue)
+ 6
+ >>> j() is j
+ True
+ >>> len(queue)
+ 7
+ >>> queue[-2] is j
+ True
+ >>> j.begin_after
+ datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+commitError: "please reschedule now"
+
+ >>> error_flag = False
+ >>> old_commit = transaction.TransactionManager.commit
+ >>> def commit(self):
+ ... global error_flag
+ ... if error_flag:
+ ... error_flag = False
+ ... raise ValueError()
+ ... old_commit(self)
+ ...
+ >>> transaction.TransactionManager.commit = commit
+ >>> def causeCommitError():
+ ... global error_flag
+ ... error_flag = True
+ ...
+ >>> StubRetryPolicy._reply = datetime.timedelta()
+ >>> j = queue.put(causeCommitError, retry_policy_factory=StubRetryPolicy)
+ >>> len(queue)
+ 8
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'causeCommitError')
+ ...
+ True
+ >>> len(queue)
+ 7
+ >>> j() is j
+ True
+ >>> len(queue)
+ 8
+ >>> queue.claim() is j
+ True
+
+commitError: "please reschedule later"
+
+ >>> StubRetryPolicy._reply = datetime.timedelta(hours=1)
+ >>> j = queue.put(causeCommitError, retry_policy_factory=StubRetryPolicy)
+ >>> len(queue)
+ 8
+ >>> j is alt_agent.claimJob(
+ ... filter=lambda candidate:
+ ... candidate.callable.__name__ == 'causeCommitError')
+ ...
+ True
+ >>> len(queue)
+ 7
+ >>> j() is j
+ True
+ >>> len(queue)
+ 8
+ >>> queue[-2] is j
+ True
+ >>> j.begin_after
+ datetime.datetime(2006, 8, 10, 18, 32, 33, tzinfo=<UTC>)
+
+
+ >>> transaction.TransactionManager.commit = old_commit
+
=========
Footnotes
=========
@@ -801,12 +1198,12 @@
Traceback (most recent call last):
...
KeyError: 'foo'
-
+
>>> container['foo'] = None
Traceback (most recent call last):
...
ValueError: value must be IQueue
-
+
>>> del container['']
>>> len(container)
0
Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/testing.py 2008-06-20 01:18:18 UTC (rev 87584)
@@ -13,7 +13,9 @@
##############################################################################
import threading
import bisect
-import time
+from time import sleep as time_sleep # this import style is intentional, so
+# that test monkeypatching of time.sleep does not affect the usage in this
+# module
import datetime
import pytz
@@ -51,6 +53,9 @@
raw = super(_datetime, self).__repr__()
return "datetime.datetime%s" % (
raw[raw.index('('):],)
+ def __add__(self, other):
+ return _datetime(
+ *super(_datetime,self).__add__(other).__reduce__()[1])
def __reduce__(self):
return (argh, super(_datetime, self).__reduce__()[1])
def argh(*args, **kwargs):
@@ -103,7 +108,7 @@
assert _when == 'before' and _event == 'shutdown', (
'unsupported trigger')
self.triggers.append((_when, _event, _callable, args, kwargs))
-
+
def callInThread(self, _callable, *args, **kw):
# very naive should be fine...
thread = threading.Thread(target=_callable, args=args, kwargs=kw)
@@ -177,7 +182,7 @@
break
else:
break
- time.sleep(0.1)
+ time_sleep(0.1)
else:
print 'TIME OUT'
@@ -189,7 +194,7 @@
for i in range(seconds * 10):
if len(dispatcher.polls) > count:
return dispatcher.polls.first()
- time.sleep(0.1)
+ time_sleep(0.1)
else:
assert False, 'no poll!'
@@ -198,7 +203,7 @@
t = transaction.begin()
if job.status == zc.async.interfaces.COMPLETED:
return job.result
- time.sleep(0.1)
+ time_sleep(0.1)
else:
assert False, 'job never completed'
@@ -208,6 +213,6 @@
t = transaction.begin()
if name in job.annotations:
return job.annotations[name]
- time.sleep(0.1)
+ time_sleep(0.1)
else:
assert False, 'annotation never found'
Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py 2008-06-20 01:06:28 UTC (rev 87583)
+++ zc.async/trunk/src/zc/async/utils.py 2008-06-20 01:18:18 UTC (rev 87584)
@@ -14,14 +14,27 @@
import datetime
import logging
import sys
+import time
+import ZEO.Exceptions
+import ZODB.POSException
import rwproperty
import persistent
+import zope.minmax
import zc.dict
import pytz
import zope.bforest.periodic
+EXPLOSIVE_ERRORS = (SystemExit, KeyboardInterrupt)
+
+SYSTEM_ERRORS = (ZEO.Exceptions.ClientDisconnected,
+ ZODB.POSException.POSKeyError)
+
+INITIAL_BACKOFF = 5
+MAX_BACKOFF = 60
+BACKOFF_INCREMENT = 5
+
def simpleWrapper(name):
# notice use of "simple" in function name! A sure sign of trouble!
def wrapper(self, *args, **kwargs):
@@ -47,39 +60,12 @@
def __parent__(self, value):
self._z_parent__ = None
+# for legacy databases
+Atom = zope.minmax.Maximum
-class Atom(persistent.Persistent):
- def __init__(self, value):
- self.value = value
- def __getstate__(self):
- return self.value
+class Dict(zc.dict.Dict, Base):
- def __setstate__(self, state):
- self.value = state
-
-class AtomDescriptor(object):
- def __init__(self, name, initial=None):
- self.name = name
- self.initial = initial
-
- def __get__(self, obj, klass=None):
- if obj is None:
- return self
- return obj.__dict__[self.name].value
-
- def __set__(self, obj, value):
- obj.__dict__[self.name].value = value
-
- def initialize(self, obj):
- obj.__dict__[self.name] = Atom(self.initial)
-
-def createAtom(name, initial):
- sys._getframe(1).f_locals[name] = AtomDescriptor(name, initial)
-
-
-class Dict(zc.dict.Dict, Base):
-
copy = None # mask
def __setitem__(self, key, value):
@@ -236,3 +222,114 @@
def __len__(self):
return len(self._data)
+
+def never_fail(call, identifier, tm):
+ # forever for TransactionErrors; forever, with backoff, for anything else
+ trans_ct = 0
+ backoff_ct = 0
+ backoff = INITIAL_BACKOFF
+ res = None
+ while 1:
+ try:
+ res = call()
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ trans_ct += 1
+ if not trans_ct % 5:
+ log.warning(
+ '%d consecutive transaction errors while %s',
+ trans_ct, identifier, exc_info=True)
+ res = None
+ except EXPLOSIVE_ERRORS:
+ tm.abort()
+ raise
+ except Exception, e:
+ if isinstance(e, SYSTEM_ERRORS):
+ level = logging.ERROR
+ else:
+ level = logging.CRITICAL
+ tm.abort()
+ backoff_ct += 1
+ if backoff_ct == 1:
+ # import pdb; pdb.set_trace()
+ log.log(level,
+ 'first error while %s; will continue in %d seconds',
+ identifier, backoff, exc_info=True)
+ elif not backoff_ct % 10:
+ log.log(level,
+ '%d consecutive errors while %s; '
+ 'will continue in %d seconds',
+ backoff_ct, identifier, backoff, exc_info=True)
+ res = None
+ time.sleep(backoff)
+ backoff = min(MAX_BACKOFF, backoff + BACKOFF_INCREMENT)
+ else:
+ return res
+
+def wait_for_system_recovery(call, identifier, tm):
+ # forever for TransactionErrors; forever, with backoff, for SYSTEM_ERRORS
+ trans_ct = 0
+ backoff_ct = 0
+ backoff = INITIAL_BACKOFF
+ res = None
+ while 1:
+ try:
+ res = call()
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ trans_ct += 1
+ if not trans_ct % 5:
+ log.warning(
+ '%d consecutive transaction errors while %s',
+ ct, identifier, exc_info=True)
+ res = None
+ except EXPLOSIVE_ERRORS:
+ tm.abort()
+ raise
+ except SYSTEM_ERRORS:
+ tm.abort()
+ backoff_ct += 1
+ if backoff_ct == 1:
+ log.error('first error while %s; will continue in %d seconds',
+ identifier, backoff, exc_info=True)
+ elif not backoff_ct % 5:
+
+ log.error('%d consecutive errors while %s; '
+ 'will continue in %d seconds',
+ backoff_ct, identifier, backoff, exc_info=True)
+ res = None
+ time.sleep(backoff)
+ backoff = min(MAX_BACKOFF, backoff + BACKOFF_INCREMENT)
+ except:
+ log.error('Error while %s', identifier, exc_info=True)
+ tm.abort()
+ return zc.twist.Failure()
+ else:
+ return res
+
+def try_transaction_five_times(call, identifier, tm):
+ ct = 0
+ res = None
+ while 1:
+ try:
+ res = call()
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ ct += 1
+ if ct >= 5:
+ log.error('Five consecutive transaction errors while %s',
+ identifier, exc_info=True)
+ res = zc.twist.Failure()
+ else:
+ continue
+ except EXPLOSIVE_ERRORS:
+ tm.abort()
+ raise
+ except:
+ tm.abort()
+ log.error('Error while %s', identifier, exc_info=True)
+ res = zc.twist.Failure()
+ return res
More information about the Checkins
mailing list