[Checkins]
SVN: zc.async/branches/dev/ things are a bit in disarray,
but want to save work,
so checking in on a branch. branch started from trunk rev 86847.
Gary Poster
gary at zope.com
Mon May 19 21:08:43 EDT 2008
Log message for revision 86848:
things are a bit in disarray, but want to save work, so checking in on a branch. branch started from trunk rev 86847.
Changed:
U zc.async/branches/dev/buildout.cfg
U zc.async/branches/dev/src/zc/async/TODO.txt
U zc.async/branches/dev/src/zc/async/catastrophes.txt
U zc.async/branches/dev/src/zc/async/dispatcher.py
U zc.async/branches/dev/src/zc/async/job.py
U zc.async/branches/dev/src/zc/async/queue.py
U zc.async/branches/dev/src/zc/async/utils.py
-=-
Modified: zc.async/branches/dev/buildout.cfg
===================================================================
--- zc.async/branches/dev/buildout.cfg 2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/buildout.cfg 2008-05-20 01:08:43 UTC (rev 86848)
@@ -13,13 +13,13 @@
[test]
recipe = zc.recipe.testrunner
eggs = zc.async
-defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1'.split()
+defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1 --auto-color'.split()
working-directory = ${buildout:directory}
[z3test]
recipe = zc.recipe.testrunner
eggs = zc.async [z3]
-defaults = "--tests-pattern z3tests --exit-with-status -1".split()
+defaults = "--tests-pattern z3tests --exit-with-status -1 --auto-color".split()
[interpreter]
Modified: zc.async/branches/dev/src/zc/async/TODO.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/TODO.txt 2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/TODO.txt 2008-05-20 01:08:43 UTC (rev 86848)
@@ -1,11 +1,60 @@
Bugs and improvements:
-- need retry tasks, particularly for callbacks
+- need retry tasks, particularly for callbacks. ``retry_count`` affects aborts
+ and transaction failures? None == infinity, which is what callbacks use?
+ Should retry have a cleanup function?
- need CRITICAL logs for callbacks
-- when database went away, and then came back, async didn't come back.
+- when database went away, and then came back, async didn't come back. Fix
+ (and also reconsider retry behavior in Dispatcher._commit and
+ AgentThreadPool.perform_thread).
+Traceback (most recent call last):
+ File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/dispatcher.py", line 321, in _commit
+ transaction.commit()
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_manager.py", line 93, in commit
+ return self.get().commit()
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 325, in commit
+ self._commitResources()
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 422, in _commitResources
+ rm.tpc_begin(self)
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/Connection.py", line 525, in tpc_begin
+ self._normal_storage.tpc_begin(transaction)
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 1079, in tpc_begin
+ self._server.tpc_begin(id(txn), txn.user, txn.description,
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 86, in __getattr__
+ raise ClientDisconnected()
+ClientDisconnected
+
+Exception in thread Thread-5:
+Traceback (most recent call last):
+ File "/opt/cleanpython24/lib/python2.4/threading.py", line 442, in __bootstrap
+ self.run()
+ File "/opt/cleanpython24/lib/python2.4/threading.py", line 422, in run
+ self.__target(*self.__args, **self.__kwargs)
+ File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/dispatcher.py", line 153, in perform_thread
+ job() # this does the committing and retrying, largely
+ File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 290, in __call__
+ return self._call_with_retry(
+ File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 321, in _call_with_retry
+ res = self._complete(zc.twist.Failure(), tm)
+ File "/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py", line 340, in _complete
+ tm.commit()
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_manager.py", line 93, in commit
+ return self.get().commit()
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 325, in commit
+ self._commitResources()
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/_transaction.py", line 422, in _commitResources
+ rm.tpc_begin(self)
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/Connection.py", line 525, in tpc_begin
+ self._normal_storage.tpc_begin(transaction)
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 1079, in tpc_begin
+ self._server.tpc_begin(id(txn), txn.user, txn.description,
+ File "/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZEO/ClientStorage.py", line 86, in __getattr__
+ raise ClientDisconnected()
+ClientDisconnected
+
- be even more pessimistic about memory for saved polls and job info in
dispatcher.
Modified: zc.async/branches/dev/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/catastrophes.txt 2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/catastrophes.txt 2008-05-20 01:08:43 UTC (rev 86848)
@@ -1,417 +1,537 @@
-Catastrophes
-============
+Recovering from Catastrophes
+============================
-Sometimes bad things happen in the course of processing tasks. You might have
-a MemoryError while processing your main job, or some other failure might
-happen. That's bad enough. Of course, you can register some callbacks to
-handle the error, to do what you need to recover.
+------------
+Introduction
+------------
-But then what if the callback itself fails? Perhaps the situation that caused
-the main job to fail with a MemoryError will let the callback start, but not
-complete. Then when a sibling dispatcher handles the incomplete job, the
-callback will fail.
+Sometimes bad things happen in the course of processing tasks. What might go
+wrong? How does zc.async handle these errors? What are your responsibilities?
-You, the user, do have some responsibilities. Callbacks should be very fast and
-light. If you want to do something that takes a long time, or might take a long
-time, have your callback put a new job in a queue for the long job. The
-callback itself should then complete, quickly out of the way.
+First, what might go wrong?
-However, zc.async also has important responsibilities.
+- zc.async could have a problem while polling for jobs. We'll call this a
+ "polling exception."
-This document examines catastrophes like the ones outlined above, to show how
-zc.async handles them, and how you can configure zc.async for these situations.
-Other documents in zc.async, such as the "Dead Dispatchers" section of
-queue.txt, look at this with some isolation and stubs; this document uses a
-complete zc.async set up to examine the system holistically [#setUp]_.
+- zc.async could have a problem while performing a particular job. We'll call
+ this a "job-related exception."
-These are the scenarios we'll contemplate:
+For the purpose of this discussion, we will omit the possibility that zc.async
+has a bug. That is certainly a possibility, but the recovery story is not
+predictable, and if we knew of a bug, we'd try to fix it, rather than discuss
+it here!
-- The system has a single dispatcher. The dispatcher is working on a job with a
- callback. The dispatcher dies, and then restarts, cleaning up. We'll do two
- variants, one with a graceful shutdown and one with a hard crash.
+Exploring polling exceptions and job related exceptions will illuminate the
+more specific catastrophes you may encounter, and how your code and zc.async's
+can work together to handle them. We'll discuss each, then drill down into
+some specific scenarios.
-- The system has two dispatchers. One dispatcher is working on a job with a
- callback, and then dies. The other dispatcher cleans up.
+Polling Exceptions
+------------------
-- The system has a single dispatcher. The dispatcher is working on a job, and
- successfully completes it. The callback begins, and then fails.
+Polling exceptions are, at least in theory, the least of your worries. You
+shouldn't have to worry about them; and if you do, it is probably a basic
+configuration problem that you need to address, such as making sure that the
+dispatcher process has access to the needed databases and software; or making
+sure that the dispatcher process is run by a daemonizing software that will
+restart if needed, such as zdaemon (http://pypi.python.org/pypi/zdaemon) or
+supervisor (http://supervisord.org/).
-- The system has a single dispatcher. The dispatcher is working on a job, and
- successfully completes it. The callback begins, and then the dispatcher
- dies.
+zc.async is largely responsible for dealing with polling exceptions. What does
+it have to handle?
-- The system has a single dispatcher. The database goes away, and then comes
- back.
+- The process running the poll ends, perhaps in the middle of a poll.
--------------------------------------------------
-Dispatcher Dies Gracefully While Performing a Job
--------------------------------------------------
+- zc.async cannot commit a transaction during the poll, for instance because of
+ a ConflictError, or because the database is unavailable.
-First let's consider how a failed job with a callback or two is handled when
-the dispatcher dies.
+What needs to happen to handle these problems?
-Here we start a job.
+Process Ends
+............
- >>> import zope.component
- >>> import threading
- >>> import transaction
- >>> import zc.async.interfaces
- >>> import zc.async.testing
- >>> import zc.async.dispatcher
+If the process ends, your daemonizing front-end (zdaemon, supervisor, etc.)
+needs to restart it. The ZODB will discard incomplete transaction data, if any.
- >>> queue = root[zc.async.interfaces.KEY]['']
- >>> lock = threading.Lock()
- >>> lock.acquire()
- True
- >>> def wait_for_me():
- ... lock.acquire()
- ... lock.release() # so we can use the same lock again later
- ... raise SystemExit() # this will cause the worker thread to exit
- ...
- >>> def handle_error(result):
- ... return '...I handled the error...'
- ...
- >>> job = queue.put(wait_for_me)
- >>> callback_job = job.addCallbacks(failure=handle_error)
- >>> transaction.commit()
- >>> dispatcher = zc.async.dispatcher.get()
- >>> poll = zc.async.testing.get_poll(dispatcher)
- >>> wait_for_start(job)
+The only thing a zc.async dispatcher needs to handle is clean up.
-In this scenario, ``wait_for_me`` is a job that will "unexpectedly" be lost
-while the dispatcher stops working. ``handle_error`` is the hypothetical
-handler that should be called if the ``wait_for_me`` job fails.
+- Ideally it will be able to deactivate its record in the ZODB during the
+ process shutdown.
-The job has started. Now, the dispatcher suddenly dies without the thread
-performing ``wait_for_me`` getting a chance to finish. For our first example,
-let's give the dispatcher a graceful exit. The dispatcher gets a chance to
-clean up its dispatcher agents, and job.fail() goes into the queue.
+- Instead, if it was a "hard crash" that didn't allow deactivation, a sibling
+ dispatcher will realize that the dispatcher is down and deactivate it.
- >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
- >>> wait_to_deactivate(dispatcher)
- >>> _ = transaction.begin()
- >>> job.status == zc.async.interfaces.ACTIVE
- True
- >>> len(queue)
- 1
- >>> fail_job = queue[0]
- >>> fail_job
- <zc.async.job.Job (oid 51, db 'unnamed') ``zc.async.job.Job (oid 30, db 'unnamed') :fail()``>
- >>> queue[0].callable
- <bound method Job.fail of <zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.wait_for_me()``>>
+- Or, finally, if it was a hard crash without a sibling, and the daemon
+ restarts a process for the original dispatcher instance, the new process
+ needs to realize that the old process is dead, not competing with it.
-Now when the process starts back up again, our callback will be performed.
+Transaction Error
+.................
- >>> old_dispatcher = dispatcher
- >>> zc.async.dispatcher.clear()
- >>> zc.async.subscribers.ThreadedDispatcherInstaller(
- ... poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
- >>> dispatcher = zc.async.dispatcher.get()
- >>> zc.async.testing.wait_for_result(fail_job)
- >>> job.status == zc.async.interfaces.COMPLETED
- True
- >>> job.result
- <zc.twist.Failure zc.async.interfaces.AbortedError>
- >>> callback_job.status == zc.async.interfaces.COMPLETED
- True
- >>> callback_job.result
- '...I handled the error...'
+If the poll gets a conflict error, it should simply abort and retry the poll,
+forever, with a small back-off.
-So, our callback had a chance to do whatever it thought appropriate--in this
-case, simply returning a string--once the dispatcher got back online
-[#cleanup1]_.
+If the database goes away (perhaps the ZEO server goes down for a bit, and the
+ZEO client to which the dispatcher is connected is trying to reconnect) it
+should gracefully try to wait for the database to return, and resume when it
+does.
-------------------------------------------------
-Dispatcher Crashes "Hard" While Performing a Job
-------------------------------------------------
+Other, more dramatic errors, such as POSKey errors, are generally considered to
+be out of zc.async's domain and control. It should ideally continue to try to
+resume as long as the process is alive, in case somehow the situation improves,
+but this may be difficult and the expectations for zc.async's recovery are
+lower than with ConflictErrors and ClientDisconnected errors.
-Our next catastrophe only changes one aspect to the previous one: the
-dispatcher does not stop gracefully, and does not have a chance to clean up its
-active jobs. It is a "hard" crash.
+Summary of Polling Exceptions
+.............................
-To show this, we will start a job, simulate the dispatcher dying "hard," and
-restart it so it clean up.
+To repeat, then, polling exceptions have two basic scenarios.
-So, first we start a long-running job in the dispatcher.
+If a dispatcher process ends, it needs to deactivate its record in the ZODB, or
+let another process know to deactivate it.
- >>> lock.acquire()
- True
- >>> job = queue.put(wait_for_me)
- >>> callback_job = job.addCallbacks(failure=handle_error)
- >>> transaction.commit()
- >>> dispatcher = zc.async.dispatcher.get()
- >>> poll = zc.async.testing.get_poll(dispatcher)
- >>> wait_for_start(job)
+If a ZODB.POSException.ConflictError occurs, retry forever with a small
+backoff; or if ZEO.Exceptions.ClientDisconnected occurs, retry forever with a
+small backoff, waiting for the database to come back.
-Now we'll "crash" the dispatcher.
+Most anything else will ideally keep zc.async attempting to re-poll, but it may
+not happen: expectations are lower.
- >>> dispatcher.activated = False # this will make polling stop, without
- ... # cleanup
- >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
- >>> dispatcher.thread.join(3)
+Job-Related Exceptions
+----------------------
-Hard crashes can be detected because the dispatchers write datetimes to the
-database every few polls. A given dispatcher instance does this for each queue
-on a ``DispatcherAgents`` object available in ``queue.dispatchers[UUID]``,
-where ``UUID`` is the uuid of that dispatcher.
+What about job-related exceptions? Responsibility for handling job-related
+exceptions is shared between your code and zc.async's. What might happen?
-The ``DispatcherAgents`` object has four pertinent attributes:
-``ping_interval``, ``ping_death_interval``, ``last_ping``, and ``dead``. About
-every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is supposed
-to write a ``datetime`` to ``last_ping``. If the ``last_ping`` plus the
-``ping_death_interval`` (also a ``timedelta``) is older than now, the
-dispatcher is considered to be ``dead``, and old jobs should be cleaned up.
+- Your job might fail internally.
-The ``ping_interval`` defaults to 30 seconds, and the ``ping_death_interval``
-defaults to 60 seconds. Generally, the ``ping_death_interval`` should be at
-least two or three poll intervals (``zc.async.dispatcher.get().poll_interval``)
-greater than the ``ping_interval``.
+- The process running your task ends before completing your task.
-The ping hasn't timed out yet, so the dispatcher isn't considered dead yet.
+- zc.async cannot commit a transaction after your task completes, for instance
+ because of a ConflictError, or because the database is unavailable.
- >>> _ = transaction.begin()
- >>> import zc.async.instanceuuid
- >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
- >>> da.ping_death_interval
- datetime.timedelta(0, 60)
- >>> da.ping_interval
- datetime.timedelta(0, 30)
- >>> bool(da.activated)
- True
- >>> da.dead
- False
+What should occur to handle these problems?
-Therefore, the job is still sitting around in the dispatcher's pile in the
-database (the ``main`` key is for the ``main`` agent installed in this
-dispatcher in the set up for these examples).
+Job Fails
+.........
- >>> job in da['main']
- True
- >>> job.status == zc.async.interfaces.ACTIVE
- True
+As discussed elsewhere, if your job fails in your own code, this is mostly
+your reponsibility. You should handle possible errors both within your job's
+code, and in callbacks, as appropriate. zc.async's responsibilities are merely
+to report.
-Let's start our dispatcher up again.
+By default, zc.async will log a failure of a job entered in a queue at the
+"ERROR" level in the ``zc.async.events`` log, and it will log a failure of a
+callback or other internal job at the "CRITICAL" level. This can be controlled
+per-process and per-job, as we'll see below. These tracebacks include
+information about the local and global variables for each frame in the stack,
+which can be useful to deduce the problem that occurred.
- >>> old_dispatcher = dispatcher
- >>> zc.async.dispatcher.clear()
- >>> zc.async.subscribers.ThreadedDispatcherInstaller(
- ... poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
- >>> dispatcher = zc.async.dispatcher.get()
+zc.async also includes a ``Failure`` object on the job as a result, to let you
+react to the problem in a callback, and analyze it later. This is discussed in
+detail elsewhere.
-Initially, it's going to be a bit confused, because it sees that the
-DispatcherAgents object is ``activated``, and not ``dead``. It can't tell if
-there's another process using its same UUID, or if it is looking at the result
-of a hard crash.
+The RetryPolicy, discussed later, can try to react to exceptions from the job's
+code, but the default policies included in the package do not.
- >>> zc.async.testing.wait_for_result(job, seconds=1)
- Traceback (most recent call last):
- ...
- AssertionError: job never completed
- >>> zc.async.testing.get_poll(dispatcher, seconds=1)
- {'': None}
- >>> for r in reversed(event_logs.records):
- ... if r.levelname == 'ERROR':
- ... break
- ... else:
- ... assert False, 'did not find log'
- ...
- >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- UUID ... already activated in queue (oid 4): another process?
- (To stop poll attempts in this process, set
- ``zc.async.dispatcher.get().activated = False``. To stop polls
- permanently, don't start a zc.async.dispatcher!)
+Process Ends
+............
+If a process ends while it is performing a job, that is similar, in large part,
+to the possibility of the process ending the polling job: we need to restart
+the process, and realize that we had started the job. But should we restart the
+job, or abort it?
-To speed up the realization of our dispatcher that the previous activation is
-``dead``, we'll set the ping_death_interval to just one second.
+Answering this question is a matter of policy, and requires knowing what each
+job does.
- >>> _ = transaction.begin()
- >>> da.dead
- False
- >>> import datetime
- >>> da.ping_death_interval = datetime.timedelta(seconds=1)
- >>> da.dead
- True
- >>> bool(da.activated)
- True
- >>> transaction.commit()
+Generally, if a job is fully transactional, such as writing something to the
+ZODB, and the job has not timed out yet, you'll want to restart it. You might
+want to restart some reasonably large number of times, and then suspect that,
+since you can't seem to finish the job, maybe the job is causing the process to
+die, and you should abort. Or perhaps you want to restart for ever.
-After the next poll, the dispatcher will have cleaned up its old tasks in the
-same way we saw in the previous example. The job's ``fail`` method will be
-called, and the callback will be performed. The DispatcherAgents object is
-no longer dead, because it is tied to the new instance of the dispatcher.
+If the job isn't transactional, such as communicating with an external service,
+you might want to abort the job, and set up some callbacks to handle the
+fallout.
- >>> poll = zc.async.testing.get_poll(dispatcher)
- >>> _ = transaction.begin()
- >>> job in da['main']
- False
- >>> bool(da.activated)
- True
- >>> da.dead
- False
- >>> fail_job = job.parent
- >>> fail_job
- <zc.async.job.Job (oid 78, db 'unnamed') ``zc.async.job.Job (oid 57, db 'unnamed') :fail()``>
+As we'll see below, zc.async defaults to guessing that jobs placed directly in
+a queue are transactional, and can be restarted up to ten times; and that jobs
+used as callbacks are also transactional, and should be restarted until they
+succeed. The defaults can be changed and the behavior of an individual
+job can be changed.
-Let's see it happen.
+These settings are controlled with a RetryPolicy, discussed below.
- >>> zc.async.testing.wait_for_result(fail_job)
- >>> job.status == zc.async.interfaces.COMPLETED
- True
- >>> job.result
- <zc.twist.Failure zc.async.interfaces.AbortedError>
- >>> callback_job.status == zc.async.interfaces.COMPLETED
- True
- >>> callback_job.result
- '...I handled the error...'
+Transaction Error
+.................
-The dispatcher cleaned up its own "hard" crash.
+Handling transaction errors after processing a job is also similar to the
+handling of transaction errors for polling exceptions. ConflictErrors and
+ClientDisconnected errors should often cause jobs to be aborted and restarted.
+However, if the job is not transactional, such as communicating with an
+external service, a simple abourt and retry may be hazardous. Also, many jobs
+should be stopped if they retry on ConflictError more than some heuristic
+bellweather, with the logic that they may be simply doing something too
+problematic, and they are blocking other tasks from starting. But other jobs
+should be retried until they complete.
-[#cleanup2]_
+As mentioned above, zc.async defaults to guessing that jobs are transactional.
+Client Disconnected errors are retried forever. Jobs placed in a queue retry
+ConflictErrors five times, while callbacks retry them forever, with a small
+backoff. The defaults can be changed and the behavior of an individual
+job can be changed, using the RetryPolicy described below.
------------------------------------------------------------------
-Dispatcher Crashes "Hard" While Performing a Job, Sibling Resumes
------------------------------------------------------------------
+While custom RetryPolicies can try to handle other transaction errors, they are
+generally considered to be out of zc.async's domain and control.
-Our next catastrophe is the same as the one before, except, after one
-dispatcher's hard crash, another dispatcher is around to clean up the dead
-jobs.
+Summary of Job-Related Exceptions
+.................................
-To show this, we will start a job, start a second dispatcher, simulate the
-first dispatcher dying "hard," and watch the second dispatcher clean up
-after the first one.
+If an exception handles in your job's code, zc.async will log it as an ERROR
+if a main queue job and as CRITICAL if it is a callback; and it will make the
+result of the call a ``Failure`` with error information, as shown elsewhere.
+Everything else is your responsibility, to be handled with try:except or
+try:finally blocks in your code, callbacks, or custom RetryPolicies.
-So, first we start a long-running job in the dispatcher as before.
+Process death, conflict errors, and ``ClientDisconnected`` errors all may need
+to be handled differently for different jobs. zc.async has a default policy for
+jobs placed in a queue, and for callback jobs. The default policy, a
+RetryPolicy, can be changed and can be set explicitly per-job.
- >>> lock.acquire()
- True
- >>> job = queue.put(wait_for_me)
- >>> callback_job = job.addCallbacks(failure=handle_error)
- >>> transaction.commit()
- >>> dispatcher = zc.async.dispatcher.get()
- >>> poll = zc.async.testing.get_poll(dispatcher)
- >>> wait_for_start(job)
+Your Responsibilities
+---------------------
-Now we'll start up an alternate dispatcher.
+As the author of a zc.async job, your responsibilities, then, are to handle
+your own exceptions; and to make sure that the retry policy for each job is
+appropriate. This is controlled with an IRetryPolicy, as shown below.
- >>> import uuid
- >>> alt_uuid = uuid.uuid1()
- >>> zc.async.subscribers.ThreadedDispatcherInstaller(
- ... poll_interval=0.5, uuid=alt_uuid)(
- ... zc.async.interfaces.DatabaseOpened(db))
- >>> alt_dispatcher = zc.async.dispatcher.get(alt_uuid)
+As someone configuring a running dispatcher, you need to make sure that you
+give the dispatcher the necessary access to databases and software to perform
+your jobs, and you need to review (and rotate!) your logs.
-We're also going to set the main dispatcher's ``ping_death_interval`` back to
-60 seconds so we can see some polls in the alternate dispatcher before it gets
-around to cleaning up.
+zc.async's Responsibilities
+---------------------------
- >>> da.ping_death_interval = datetime.timedelta(seconds=60)
- >>> transaction.commit()
+zc.async needs to have polling robust in the face of restarts, ConflictErrors
+and ClientDisconnected errors. It needs to give your code a chance to decide
+what to do in these circumstances, and log your errors.
-Now we'll "crash" the dispatcher.
+Retry Policies
+--------------
- >>> dispatcher.activated = False # this will make polling stop, without
- ... # cleanup
- >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
- >>> dispatcher.thread.join(3)
+The rest of the document uses scenarios to illustrate how zc.async handles
+errors, and how you might want to configure retry policies. Retry policies are
+given a job, and then exception information, and then can determine whether
+zc.async should retry the job. zc.async comes with three retry policies.
-As discussed in the previous example, the polling hasn't timed out yet, so the
-alternate dispatcher can't know that the first one is dead. Therefore, the job
-is still sitting around in the old dispatcher's pile in the database.
+- One is the absence of a retry policy: do not retry this job. A value of
+ ``None`` represents this policy. This is appropriate for tasks that are
+ not transactional. They typically need to be handled with a callback.
- >>> _ = transaction.begin()
- >>> bool(da.activated)
+- The default is a retry policy that retries after a restart up to four times,
+ retries a ConflictError up to four times, and retries a ClientDisconnected up
+ to four times.
+
+- One is a retry policy that never gives up on restarts, ConflictErrors, or
+ ClientDisconnected errors: it has no limit but keeps trying forever.
+
+Scenarios
+---------
+
+We'll examine a number of scenarios, many of which combine problems in polling
+and in jobs. In these scenarios, we'll refer to a job that was placed directly
+in a queue as a "queue job". A job that was performed in any other way is a
+"callback job," because most often these are callbacks.
+
+- Polling errors
+
+ * The system is polling and gets a ConflictError.
+
+ * The system is polling and gets a ClientDisconnected error.
+
+- Internal job errors
+
+ * A worker process is polling and working on a queue job. The queue job fails
+ internally.
+
+ * A worker process is polling and working on a callback job. The callback job
+ fails internally.
+
+- Default retry policy
+
+ * A worker process is working on a job with the default retry policy and gets
+ a ConflictError during the commit.
+
+ * A worker process is working on a job with the default retry policy and gets
+ a ClientDisconnected error.
+
+ * A worker process is working on a job with the default retry policy. The
+ process dies gracefully and restarts.
+
+ * Like the previous scenario, a worker process is working on a job with the
+ default retry policy. The process crashes hard (does not die gracefully)
+ and restarts.
+
+ * Like the previous scenario, a worker process is working on a job with the
+ default retry policy. The process crashes hard (does not die gracefully)
+ and a sibling notices and takes over.
+
+- No retry policy
+
+ * A worker process is working on a job without a retry policy (that is,
+ zc.async should not retry it) and gets a ConflictError during the commit.
+
+ * A worker process is working on a job without a retry policy (that is,
+ zc.async should not retry it) and gets a ClientDisconnected error.
+
+ * A worker process is working on a job with no retry policy (that is,
+ zc.async should not retry it). The process dies gracefully and restarts.
+
+ * Like the previous scenario, a worker process is working on a job with no
+ retry policy (that is, zc.async should not retry it). The process crashes
+ hard (does not die gracefully) and restarts.
+
+ * Like the previous scenario, a worker process is working on a job with no
+ retry policy (that is, zc.async should not retry it). The process crashes
+ hard (does not die gracefully) and a sibling notices and takes over.
+
+- Retry forever policy
+
+ * A worker process is working on a job with the retry-forever policy and gets
+ a ConflictError during the commit.
+
+ * A worker process is working on a job with the retry-forever policy and gets
+ a ClientDisconnected error.
+
+ * A worker process is working on a job with the retry-forever policy. The
+ process dies gracefully and restarts.
+
+ * Like the previous scenario, a worker process is working on a job with the
+ retry-forever policy. The process crashes hard (does not die gracefully)
+ and restarts.
+
+ * Like the previous scenario, a worker process is working on a job with the
+ retry-forever policy. The process crashes hard (does not die gracefully)
+ and a sibling notices and takes over.
+
+We will close with customizations:
+
+- custom retry policies, particularly for non-transactional tasks;
+
+- changing the default retry policy, per-process and per-agent; and
+
+- changing the default log level for queue jobs, callback jobs, and per-job.
+
+-------------------------
+Scenarios: Polling Errors
+-------------------------
+
+ConflictError
+-------------
+
+A common place for a conflict error is with two dispatchers trying to claim the
+same job from the queue. This example will mimic that situation.
+
+Imagine we have a full set up with a dispatcher, agent, and queue. [#setUp]_
+We'll actually replace the agent's chooser with one that behaves badly: it
+blocks, waiting for our lock.
+
+ >>> import threading
+ >>> lock1 = threading.Lock()
+ >>> lock2 = threading.Lock()
+ >>> lock1.acquire()
True
- >>> da.dead
- False
- >>> job.status == zc.async.interfaces.ACTIVE
- True
- >>> alt_poll_1 = zc.async.testing.get_poll(alt_dispatcher)
+ >>> lock2.acquire()
+ >>> def acquireLockAndchooseFirst(agent):
+ ... res = agent.queue.claim()
+ ... if res is not None:
+ ... lock2.release()
+ ... lock1.acquire()
+ ... return res
+ ...
+ >>> import zc.async.instanceuuid
+ >>> import zc.async.interfaces
+ >>> import transaction
>>> _ = transaction.begin()
- >>> job in da['main']
- True
- >>> bool(da.activated)
- True
- >>> da.dead
- False
- >>> alt_poll_2 = zc.async.testing.get_poll(alt_dispatcher)
+ >>> queues = root[zc.async.interfaces.KEY]
+ >>> queue = queues['']
+ >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+ >>> agent = da['main']
+ >>> agent.chooser = acquireLockAndchooseFirst
+ >>> def returnSomething():
+ ... return 42
+ ...
+ >>> job = queue.put(returnSomething)
+ >>> transaction.commit()
+
+Now, when the agent tries to get our job, we'll start and commit another
+transaction that removes it from the queue. This will generate a conflict
+error for the poll's thread and transaction, because it cannot also remove the
+same job.
+
+ >>> lock2.acquire()
>>> _ = transaction.begin()
- >>> job in da['main']
+ >>> job is queue.pull()
True
- >>> bool(da.activated)
- True
- >>> da.dead
- False
+ >>> transaction.commit()
+ >>> lock1.release()
-Above, the ping_death_interval was returned to the default of 60 seconds. To
-speed up the realization of our second dispatcher that the first one is dead,
-we'll set the ping_death_interval back down to just one second.
+However, the ConflictError is handled, and polling continues.
- >>> da.ping_death_interval
- datetime.timedelta(0, 60)
- >>> import datetime
- >>> da.ping_death_interval = datetime.timedelta(seconds=1)
- >>> da.dead
+ >>> _ = transaction.begin()
+ >>> import zc.async.agent
+ >>> agent.chooser = zc.async.agent.chooseFirst
+ >>> transaction.commit()
+ >>> import zc.async.dispatcher
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> import zc.async.testing
+ >>> zc.async.testing.get_poll(dispatcher)
+
+And if we put the job back, it will be performed.
+
+ >>> job is queue.put(job)
True
- >>> bool(da.activated)
- True
>>> transaction.commit()
+ >>> zc.async.testing.wait_for_result(job)
+ 42
-After the second dispatcher gets a poll--a chance to notice--it will have
-cleaned up the first dispatcher's old tasks in the same way we saw in the
-previous example. The job's ``fail`` method will be called, and the callback
-will be performed.
+Client Disconnected
+-------------------
- >>> alt_poll_3 = zc.async.testing.get_poll(alt_dispatcher)
- >>> _ = transaction.begin()
- >>> job in da['main']
- False
- >>> bool(da.activated)
- False
- >>> da.dead
+The story is very similar if the ZEO connection goes away for a while. We'll
+mimic a ZEO ClientDisconnected error by monkeypatching
+transaction.TranasctionManager.commit.
+
+ >>> lock1.locked()
True
- >>> fail_job = job.parent
- >>> fail_job
- <zc.async.job.Job (oid 121, db 'unnamed') ``zc.async.job.Job (oid 84, db 'unnamed') :fail()``>
- >>> zc.async.testing.wait_for_result(fail_job)
- >>> job.status == zc.async.interfaces.COMPLETED
+ >>> lock2.locked()
True
- >>> job.result
- <zc.twist.Failure zc.async.interfaces.AbortedError>
- >>> callback_job.status == zc.async.interfaces.COMPLETED
- True
- >>> callback_job.result
- '...I handled the error...'
-The sibling, then, was able to clean up the mess left by the "hard" crash of
-the first dispatcher.
+ >>> job = queue.put(returnSomething)
+ >>> transaction.commit()
-[#cleanup3]_
+ >>> lock2.acquire()
+ >>> import ZEO.Exceptions
+ >>> def commit(self):
+ ... raise ZEO.Exceptions.ClientDisconnected()
+ ...
+ >>> import transaction
+ >>> old_commit = transaction.TranasctionManager.commit
+ >>> transaction.TranasctionManager.commit = commit
+ >>> lock1.release()
+ >>> zc.async.testing.get_poll(dispatcher)
+ >>> transaction.TranasctionManager.commit = old_commit
+ >>> zc.async.testing.wait_for_result(job)
+ 42
---------------
-Callback Fails
---------------
+Here's another variant that mimics being unable to read the storage during a
+poll, and then recouperating.
--------------------------------
-Dispatcher Dies During Callback
--------------------------------
+ >>> error_raised = 0
+ >>> def raiseDisconnectedThenChooseFirst(agent):
+ ... if not error_raised:
+ ... raise ZEO.Exceptions.ClientDisconnected()
+ ... return agent.queue.claim()
+ >>> agent.chooser = raiseDisconnectedThenChooseFirst
+ >>> def returnSomething():
+ ... return 42
+ ...
+ >>> job = queue.put(returnSomething)
+ >>> transaction.commit()
+ >>> zc.async.testing.get_poll(dispatcher)
+ >>> zc.async.testing.wait_for_result(job)
+ 42
------------------------------
-Database Disappears For Awhile
+Scenarios: Internal Job Errors
------------------------------
----------------------------------------------
-Other Catastrophes, And Your Responsibilities
----------------------------------------------
+Queue Job
+---------
-There are some catastrophes from which there are no easy fixes like these. For
-instance, imagine you have communicated with an external system, and gotten a
-reply that you have successfully made a transaction there, but then the
-dispatcher dies, or the database disappears, before you have a chance to commit
-the local transaction recording the success. Your code needs to see that
+Callback Job
+------------
-...multidatabase...
+-------------------------------
+Scenarios: Default Retry Policy
+-------------------------------
+ConflictError
+-------------
+
+ClientDisconnected
+------------------
+
+Graceful Shutdown
+-----------------
+
+Hard Crash
+----------
+
+Hard Crash and Sibling
+----------------------
+
+--------------------------
+Scenarios: No Retry Policy
+--------------------------
+
+ConflictError
+-------------
+
+ClientDisconnected
+------------------
+
+Graceful Shutdown
+-----------------
+
+Hard Crash
+----------
+
+Hard Crash and Sibling
+----------------------
+
+-------------------------------
+Scenarios: Retry-Forever Policy
+-------------------------------
+
+ConflictError
+-------------
+
+ClientDisconnected
+------------------
+
+Graceful Shutdown
+-----------------
+
+Hard Crash
+----------
+
+Hard Crash and Sibling
+----------------------
+
+--------------
+Customizations
+--------------
+
+Changing the Default Retry Policy
+---------------------------------
+
+Creating a New Retry Policy
+---------------------------
+
+Changing the Log Level
+----------------------
+
+
+
+
+
+
+
+
+
+
.. ......... ..
.. Footnotes ..
.. ......... ..
@@ -456,37 +576,4 @@
... break
... time.sleep(0.1)
... else:
- ... assert False, 'dispatcher never deactivated'
-
-.. [#cleanup1]
-
- >>> lock.release()
- >>> old_dispatcher.thread.join(3)
- >>> old_dispatcher.dead_pools[0].threads[0].join(3)
-
-.. [#cleanup2]
-
- >>> lock.release()
- >>> old_dispatcher.thread.join(3)
- >>> for queue_pools in old_dispatcher.queues.values():
- ... for name, pool in queue_pools.items():
- ... pool.setSize(0)
- ... for thread in pool.threads:
- ... thread.join(3)
- ...
- -3
-
-.. [#cleanup3]
-
- >>> lock.release()
- >>> dispatcher.thread.join(3)
- >>> for queue_pools in dispatcher.queues.values():
- ... for name, pool in queue_pools.items():
- ... pool.setSize(0)
- ... for thread in pool.threads:
- ... thread.join(3)
- ...
- -3
- >>> alt_dispatcher.reactor.callFromThread(alt_dispatcher.reactor.stop)
- >>> alt_dispatcher.thread.join(3)
- >>> alt_dispatcher.dead_pools[0].threads[0].join(3)
+ ... assert False, 'dispatcher never deactivated'
\ No newline at end of file
Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py 2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py 2008-05-20 01:08:43 UTC (rev 86848)
@@ -152,16 +152,6 @@
local.job = job
try:
job() # this does the committing and retrying, largely
- except ZODB.POSException.TransactionError:
- transaction.abort()
- while 1:
- job.fail()
- try:
- transaction.commit()
- except ZODB.POSException.TransactionError:
- transaction.abort() # retry forever (!)
- else:
- break
except zc.async.interfaces.BadStatusError:
transaction.abort()
zc.async.utils.log.error( # notice, not tracelog
@@ -169,6 +159,7 @@
if job.status == zc.async.interfaces.CALLBACKS:
job.resumeCallbacks() # moves the job off the agent
else:
+ count = 0
while 1:
status = job.status
if status == zc.async.interfaces.COMPLETED:
@@ -180,17 +171,38 @@
job.fail() # moves the job off the agent
try:
transaction.commit()
- except ZODB.POSException.TransactionError:
+ except (ZODB.POSException.TransactionError,
+ ZODB.POSException.POSError):
+ if count and not count % 10:
+ zc.async.utils.log.critical(
+ 'frequent database errors! '
+ 'I retry forever...',
+ exc_info=True)
+ time.sleep(1)
transaction.abort() # retry forever (!)
else:
break
+ except zc.async.utils.EXPLOSIVE_ERRORS:
+ transaction.abort()
+ raise
+ except:
+ # all errors should have been handled by the job at
+ # this point, so anything other than BadStatusError,
+ # SystemExit and KeyboardInterrupt are bad surprises.
+ transaction.abort()
+ zc.async.utils.log.critical(
+ 'unexpected error', exc_info=True)
+ raise
# should come before 'completed' for threading dance
if isinstance(job.result, twisted.python.failure.Failure):
info['failed'] = True
info['result'] = job.result.getTraceback(
- elideFrameworkCode=True, detail='verbose')
+ elideFrameworkCode=True)
else:
info['result'] = repr(job.result)
+ if len(info['result']) > 10000:
+ info['result'] = (
+ info['result'][:10000] + '\n[...TRUNCATED...]')
info['completed'] = datetime.datetime.utcnow()
finally:
local.job = None
@@ -233,7 +245,7 @@
self.queue.put(None)
return size - old # size difference
-# this is mostly for testing
+# this is mostly for testing, though ``get`` comes in handy generally
_dispatchers = {}
@@ -249,6 +261,8 @@
clear = _dispatchers.clear
+# end of testing bits
+
class Dispatcher(object):
activated = False
@@ -292,57 +306,22 @@
self.dead_pools = []
def _getJob(self, agent):
- try:
- job = agent.claimJob()
- except zc.twist.EXPLOSIVE_ERRORS:
- transaction.abort()
- raise
- except:
- transaction.abort()
- zc.async.utils.log.error(
- 'Error trying to get job for UUID %s from '
- 'agent %s (oid %s) in queue %s (oid %s)',
- self.UUID, agent.name, agent._p_oid,
- agent.queue.name,
- agent.queue._p_oid, exc_info=True)
- return zc.twist.Failure()
- res = self._commit(
- 'Error trying to commit getting a job for UUID %s from '
- 'agent %s (oid %s) in queue %s (oid %s)' % (
- self.UUID, agent.name, agent._p_oid,
- agent.queue.name,
- agent.queue._p_oid))
- if res is None:
- # Successful commit
- res = job
+ identifier = (
+ 'getting job for UUID %s from agent %s (oid %d) '
+ 'in queue %s (oid %d)' % (
+ self.UUID, agent.name, ZODB.utils.u64(agent._p_oid),
+ agent.queue.name, ZODB.utils.u64(agent.queue._p_oid)))
+ res = zc.async.utils.try_transaction_five_times(
+ agent.claimJob, identifier, transaction)
+ if isinstance(res, twisted.python.failure.Failure):
+ identifier = 'stashing failure on agent %s (oid %s)' % (
+ agent.name, ZODB.utils.u64(agent._p_oid))
+ def setFailure():
+ agent.failure = res
+ zc.async.utils.try_transaction_five_times(
+ setFailure, identifier, transaction)
return res
- def _commit(self, debug_string=''):
- retry = 0
- while 1:
- try:
- transaction.commit()
- except ZODB.POSException.TransactionError:
- transaction.abort()
- if retry >= 5:
- zc.async.utils.log.error(
- 'Repeated transaction error trying to commit in '
- 'zc.async: %s',
- debug_string, exc_info=True)
- return zc.twist.Failure()
- retry += 1
- except zc.twist.EXPLOSIVE_ERRORS:
- transaction.abort()
- raise
- except:
- transaction.abort()
- zc.async.utils.log.error(
- 'Error trying to commit: %s',
- debug_string, exc_info=True)
- return zc.twist.Failure()
- else:
- break
-
def poll(self):
poll_info = PollInfo()
started_jobs = []
@@ -358,29 +337,30 @@
queue.dispatchers.register(self.UUID)
da = queue.dispatchers[self.UUID]
if queue._p_oid not in self._activated:
- if da.activated:
- if da.dead:
- da.deactivate()
- else:
- zc.async.utils.log.error(
- 'UUID %s already activated in queue %s '
- '(oid %d): another process? (To stop '
- 'poll attempts in this process, set '
- '``zc.async.dispatcher.get().activated = '
- "False``. To stop polls permanently, don't "
- 'start a zc.async.dispatcher!)',
- self.UUID, queue.name,
- ZODB.utils.u64(queue._p_oid))
- continue
- da.activate()
- self._activated.add(queue._p_oid)
- # removed below if transaction fails
- res = self._commit(
- 'Error trying to commit activation of UUID %s in '
- 'queue %s (oid %s)' % (
- self.UUID, queue.name, queue._p_oid))
- if res is not None:
- self._activated.remove(queue._p_oid)
+ identifier = (
+ 'activating dispatcher UUID %s in queue %s (oid %d)' %
+ (self.UUID, queue.name, ZODB.utils.u64(queue._p_oid)))
+ def activate():
+ if da.activated:
+ if da.dead:
+ da.deactivate()
+ else:
+ zc.async.utils.log.error(
+ 'UUID %s already activated in queue %s '
+ '(oid %d): another process? (To stop '
+ 'poll attempts in this process, set '
+ '``zc.async.dispatcher.get().activated = '
+ "False``. To stop polls permanently, don't "
+ 'start a zc.async.dispatcher!)',
+ self.UUID, queue.name,
+ ZODB.utils.u64(queue._p_oid))
+ return False
+ da.activate()
+ return True
+ if zc.async.utils.try_transaction_five_times(
+ activate, identifier, transaction) is True:
+ self._activated.add(queue._p_oid)
+ else:
continue
queue_info = poll_info[queue.name] = {}
pools = self.queues.get(queue.name)
@@ -398,7 +378,7 @@
try:
agent_info['size'] = agent.size
agent_info['len'] = len(agent)
- except zc.twist.EXPLOSIVE_ERRORS:
+ except zc.async.utils.EXPLOSIVE_ERRORS:
raise
except:
agent_info['error'] = zc.twist.Failure()
@@ -419,17 +399,6 @@
if isinstance(job, twisted.python.failure.Failure):
agent_info['error'] = job
job = None
- try:
- agent.failure = res
- except zc.twist.EXPLOSIVE_ERRORS:
- raise
- except:
- transaction.abort()
- zc.async.utils.log.error(
- 'error trying to stash failure on agent')
- else:
- # TODO improve msg
- self._commit('trying to stash failure on agent')
else:
info = {'result': None,
'failed': False,
@@ -448,8 +417,10 @@
pool.queue.put(
(job._p_oid, dbname, info))
job = self._getJob(agent)
- queue.dispatchers.ping(self.UUID)
- self._commit('trying to commit ping')
+ identifier = 'committing ping for UUID %s' % (self.UUID,)
+ zc.async.utils.try_transaction_five_times(
+ lambda: queue.dispatchers.ping(self.UUID), identifier,
+ transaction)
if len(pools) > len(queue_info):
conn_delta = 0
for name, pool in pools.items():
@@ -539,13 +510,16 @@
try:
transaction.begin()
try:
- queues = self.conn.root().get(zc.async.interfaces.KEY)
- if queues is not None:
- for queue in queues.values():
- da = queue.dispatchers.get(self.UUID)
- if da is not None and da.activated:
- da.deactivate()
- self._commit('trying to tear down')
+ identifier = 'cleanly deactivating UUID %s' % (self.UUID,)
+ def deactivate_das():
+ queues = self.conn.root().get(zc.async.interfaces.KEY)
+ if queues is not None:
+ for queue in queues.values():
+ da = queue.dispatchers.get(self.UUID)
+ if da is not None and da.activated:
+ da.deactivate()
+ zc.async.utils.try_transaction_five_times(
+ deactivate_das, identifier, transaction)
finally:
transaction.abort()
self.conn.close()
Modified: zc.async/branches/dev/src/zc/async/job.py
===================================================================
--- zc.async/branches/dev/src/zc/async/job.py 2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/job.py 2008-05-20 01:08:43 UTC (rev 86848)
@@ -11,11 +11,14 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
+import time
import types
import datetime
+import logging
import BTrees.OOBTree
import ZODB.POSException
+import ZEO.Exceptions
import transaction.interfaces
import persistent
import persistent.list
@@ -74,6 +77,110 @@
elif status == zc.async.interfaces.CALLBACKS:
a.resumeCallbacks()
+class IRetries(zope.interface.Interface): # XXX move
+ def jobError(failure, data_cache):
+ """whether and how to retry after an error while performing job.
+
+ return boolean as to whether to retry, or a datetime or timedelta to
+ reschedule the job in the queue. An empty timedelta means to rescedule
+ for immediately, before any pending calls in the queue."""
+
+ def transactionError(failure, data_cache):
+ """whether to retry after trying to commit a job's successful result.
+
+ return boolean as to whether to retry, or a datetime or timedelta to
+ reschedule the job in the queue. An empty timedelta means to rescedule
+ for immediately, before any pending calls in the queue."""
+
+ def interrupted():
+ """whether to retry after a dispatcher dies when job was in progress.
+
+ return boolean as to whether to retry, or a datetime or timedelta to
+ reschedule the job in the queue. An empty timedelta means to rescedule
+ for immediately, before any pending calls in the queue."""
+
+ def updateData(data_cache):
+ """right before committing a job, retry is given a chance to stash
+ information it has saved in the data_cache."""
+
+class Retries(persistent.Persistent): # default for '' IRetries
+ zope.component.adapts(zc.async.interfaces.IJob)
+ zope.interface.implements(zc.async.interfaces.IRetries)
+
+ # exceptions, data_cache key, max retry, initial backoff seconds,
+ # incremental backoff seconds, max backoff seconds
+ internal_exceptions = (
+ ((ZEO.Exceptions.ClientDisconnected,), 'zeo_disconnected',
+ None, 5, 5, 60),
+ ((ZODB.POSException.TransactionError,), 'transaction_error',
+ 5, 0, 0, 0),
+ )
+ transaction_exceptions = internal_exceptions
+ max_interruptions = 10
+
+ def __init__(self, job):
+ self.parent = self.__parent__ = job
+ self.data = BTrees.family32.OO.BTree()
+
+ def updateData(data_cache):
+ if 'first_active' in self.data and 'first_active' in data_cache:
+ data_cache.pop('first_active')
+ self.data.update(data_cache)
+
+ def jobError(self, failure, data_cache):
+ return self._process(failure, data_cache, self.internal_exceptions)
+
+ def transactionError(self, failure, data_cache):
+ return self._process(failure, data_cache, self.transaction_exceptions)
+
+ def _process(self, failure, data_cache, exceptions):
+ for (exc, key, max_count, init_backoff,
+ incr_backoff, max_backoff) in exceptions:
+ if failure.check(*exc) is not None:
+ count = data_cache.get(key, 0) + 1
+ if max_count is not None and count >= max_count:
+ return False
+ backoff = min(max_backoff,
+ (init_backoff + (count-1) * incr_backoff))
+ if backoff:
+ time.sleep(backoff)
+ data_cache[key] = count
+ data_cache['last_' + key] = failure
+ if 'first_active' not in data_cache:
+ data_cache['first_active'] = self.parent.active_start
+ return True
+ return False
+
+ def interrupted(self):
+ if 'first_active' not in self.data:
+ self.data['first_active'] = self.parent.active_start
+ ct = self.data['interruptions'] = self.data.get('interruptions', 0) + 1
+ return self.max_interruptions is None or ct <= self.max_interruptions
+
+class RetrySystemErrorsForever(Retries): # default for 'callback' IRetries
+ # retry on ZEO failures and Transaction errors during the job forever
+ # retry on transactionErrors and interrupteds forever.
+ internal_exceptions = (
+ ((ZEO.Exceptions.ClientDisconnected,), 'zeo_disconnected',
+ None, 5, 5, 60),
+ ((ZODB.POSException.TransactionError,), 'transaction_error',
+ None, 0, 0, 0),
+ )
+
+ max_interruptions = None
+
+ def transactionError(self, failure, data_cache):
+ res = super(RetryForever, self).transactionError(failure, data_cache)
+ if not res:
+ # that just means we didn't record it. We actually are going to
+ # retry.
+ key = 'other'
+ data_cache['other'] = data_cache.get('other', 0) + 1
+ data_cache['last_other'] = failure
+ if 'first_active' not in data_cache:
+ data_cache['first_active'] = self.parent.active_start
+ return True # always retry
+
class Job(zc.async.utils.Base):
zope.interface.implements(zc.async.interfaces.IJob)
@@ -82,7 +189,26 @@
_status = zc.async.interfaces.NEW
_begin_after = _begin_by = _active_start = _active_end = None
key = None
-
+ # default_retry_policy and retry_policy should either be name to adapt job
+ # to IRetries, or factory, or None.
+ default_retry_policy = ''
+ retry_policy = None
+ retries = None
+ default_error_log_level = logging.ERROR
+ error_log_level = None
+
+ @property
+ def effective_error_log_level(self):
+ if self.error_log_level is None:
+ return self.default_error_log_level
+ return self.error_log_level
+
+ @property
+ def effective_retry_policy(self):
+ if self.retry_policy is None:
+ return self.default_retry_policy
+ return self.retry_policy
+
assignerUUID = None
_quota_names = ()
@@ -227,7 +353,8 @@
@rwproperty.setproperty
def callable(self, value):
# can't pickle/persist methods by default as of this writing, so we
- # add the sugar ourselves
+ # add the sugar ourselves. In future, would like for args to be
+ # potentially methods of persistent objects too...
if self._status != zc.async.interfaces.NEW:
raise zc.async.interfaces.BadStatusError(
'can only set callable when a job has NEW, PENDING, or '
@@ -243,12 +370,25 @@
if zc.async.interfaces.IJob.providedBy(self._callable_root):
self._callable_root.parent = self
- def addCallbacks(self, success=None, failure=None):
+ def addCallbacks(self, success=None, failure=None,
+ error_log_level=None, retry_policy=None):
if success is not None or failure is not None:
if success is not None:
success = zc.async.interfaces.IJob(success)
+ success.default_error_log_level = logging.CRITICAL
+ if error_log_level is not None:
+ success.error_log_level = error_log_level
+ success.default_retry_policy = 'callback'
+ if retry_policy is not None:
+ success.retry_policy = retry_policy
if failure is not None:
failure = zc.async.interfaces.IJob(failure)
+ failure.default_error_log_level = logging.CRITICAL
+ if error_log_level is not None:
+ failure.error_log_level = error_log_level
+ failure.default_retry_policy = 'callback'
+ if retry_policy is not None:
+ failure.retry_policy = retry_policy
res = Job(success_or_failure, success, failure)
if success is not None:
success.parent = res
@@ -260,12 +400,18 @@
abort_handler = zc.async.interfaces.IJob(
completeStartedJobArguments)
abort_handler.args.append(res)
- res.addCallback(abort_handler)
+ res.addCallback(abort_handler, error_log_level)
+ abort_handler.default_error_log_level = logging.CRITICAL
+ if error_log_level is not None:
+ abort_handler.error_log_level = error_log_level
+ abort_handler.default_retry_policy = 'callback'
+ if retry_policy is not None:
+ abort_handler.retry_policy = retry_policy
else:
res = self
return res
- def addCallback(self, callback):
+ def addCallback(self, callback, error_log_level=None, retry_policy=None):
callback = zc.async.interfaces.IJob(callback)
self.callbacks.put(callback)
callback.parent = self
@@ -274,8 +420,39 @@
else:
self._p_changed = True # to try and fire conflict errors if
# our reading of self.status has changed beneath us
+ callback.default_error_log_level = logging.CRITICAL
+ if error_log_level is not None:
+ callback.error_log_level = error_log_level
+ callback.default_retry_policy = 'callback'
+ if retry_policy is not None:
+ callback.retry_policy = retry_policy
return callback
+ def _getRetry(self, call_name, tm, *args):
+ def getRetry():
+ retries = self.retries
+ if retries is None:
+ retry_policy = self.effective_retry_policy
+ if retry_policy is None:
+ return None # means, do not retry ever
+ elif isinstance(retry_policy, basestring):
+ retries = zope.component.getAdapter(
+ self, zc.async.interfaces.IRetries,
+ name=retry_policy)
+ else:
+ retries = retry_policy(self)
+ if retries is not None:
+ self.retries = retries
+ call = getattr(retries, call_name, None)
+ if call is None:
+ zc.async.utils.log.error(
+ 'retries %r for %r does not have required %s method',
+ retries, self, call_name)
+ return None
+ return call(*args)
+ identifier = 'getting %s retry for %r' % (call_name, self)
+ return zc.async.utils.never_fail(getRetry, identifier, tm)
+
def __call__(self, *args, **kwargs):
if self.status not in (zc.async.interfaces.NEW,
zc.async.interfaces.ASSIGNED):
@@ -289,68 +466,146 @@
effective_args[0:0] = self.args
effective_kwargs = dict(self.kwargs)
effective_kwargs.update(kwargs)
- return self._call_with_retry(
- lambda: self.callable(*effective_args, **effective_kwargs))
-
- def _call_with_retry(self, call):
- ct = 0
- tm = transaction.interfaces.ITransactionManager(self)
+ # this is the calling code. It is complex and long because it is
+ # trying both to handle exceptions reasonably, and to honor the
+ # IRetries interface for those exceptions.
+ data_cache = {}
res = None
while 1:
try:
- res = call()
- if zc.async.interfaces.IJob.providedBy(res):
- res.addCallback(self._callback)
- tm.commit()
- elif isinstance(res, twisted.internet.defer.Deferred):
- res.addBoth(zc.twist.Partial(self._callback))
- tm.commit()
- else:
- res = self._complete(res, tm)
- except ZODB.POSException.TransactionError:
+ res = self.callable(*effective_args, **effective_kwargs)
+ except zc.async.utils.EXPLOSIVE_ERRORS:
tm.abort()
- ct += 1
- if ct >= 5:
- res = self._complete(zc.twist.Failure(), tm)
- self.resumeCallbacks()
- else:
+ raise
+ except:
+ res = zc.twist.Failure()
+ tm.abort()
+ retry = self._getRetry('jobError', tm, res, data_cache)
+ if isinstance(retry, (datetime.timedelta, datetime.datetime)):
+ identifier = (
+ 'rescheduling %r as requested by '
+ 'associated IRetries %r' % (
+ self, self.retries))
+ if self is zc.async.utils.never_fail(
+ lambda: self._reschedule(retry, data_cache),
+ identifier, tm):
+ return self
+ elif retry:
continue
- except zc.twist.EXPLOSIVE_ERRORS:
+ try:
+ self._set_result(res)
+ except zc.async.utils.EXPLOSIVE_ERRORS:
tm.abort()
raise
except:
+ if isinstance(res, twisted.python.failure.Failure):
+ zc.async.utils.log.log(
+ self.effective_error_log_level,
+ 'Commit failed for %r (see subsequent traceback). '
+ 'Prior to this, job originally failed with '
+ 'traceback:\n%s',
+ self,
+ res.getTraceback(
+ elideFrameworkCode=True, detail='verbose'))
+ else:
+ zc.async.utils.tracelog.info(
+ 'Commit failed for %r (see subsequent traceback). '
+ 'Prior to this, job succeeded with result: %r',
+ self, res)
+ res = zc.twist.Failure()
tm.abort()
- res = self._complete(zc.twist.Failure(), tm)
- self.resumeCallbacks()
- else:
- if self._status == zc.async.interfaces.CALLBACKS:
- self.resumeCallbacks()
+ retry = self._getRetry('jobError', tm, res, data_cache)
+ if isinstance(retry, (datetime.timedelta, datetime.datetime)):
+ identifier = (
+ 'rescheduling %r as requested by '
+ 'associated IRetries %r' % (
+ self, self.retries))
+ if self is zc.async.utils.never_fail(
+ lambda: self._reschedule(retry, data_cache),
+ identifier, tm):
+ return self
+ elif retry:
+ continue
+ # retries didn't exist or returned False
+ def complete():
+ self._result = res
+ self._status = zc.async.interfaces.CALLBACKS
+ self._active_end = datetime.datetime.now(pytz.UTC)
+ if self.retries is not None:
+ self.retries.updateData(data_cache)
+ identifier = ('storing failure at commit of %r' % (self,))
+ zc.async.utils.never_fail(complete, identifier, tm)
+ self._complete(res)
return res
- def _callback(self, res):
- self._call_with_retry(lambda: res)
+ def handleInterrupt(self):
+ # this is called either within a job (that has a never fail policy)
+ # or withing _resumeCallbacks (that uses never_fail)
+ if self.status is not zc.async.interfaces.ACTIVE:
+ raise zc.async.interfaces.BadStatusError(
+ 'can only call ``handleInterrupt`` on a job with ACTIVE '
+ 'status')
+ tm = transaction.interfaces.ITransactionManager(self)
+ retry = self._getRetry('interrupted', tm)
+ if retry:
+ if not isinstance(retry, (datetime.timedelta, datetime.datetime)):
+ retry = datetime.timedelta()
+ self._reschedule(retry)
+ else:
+ self.fail()
- def _complete(self, res, tm):
- if isinstance(res, twisted.python.failure.Failure):
- res = zc.twist.sanitize(res)
- failure = True
+ def _reschedule(self, retry, data_cache=None):
+ if not zc.async.interfaces.IAgent.providedBy(self.parent):
+ zc.async.utils.log.error(
+ 'error for IRetries %r on %r: '
+ 'can only reschedule a job directly in an agent',
+ self.retries, self)
+ return None
+ self._status = zc.async.interfaces.NEW
+ del self._active_start
+ if data_cache is not None and self.retries is not None:
+ self.retries.updateData(data_cache)
+ self.parent.reschedule(self, retry)
+ return self
+
+ def _set_result(self, res):
+ if zc.async.interfaces.IJob.providedBy(res):
+ res.addCallback(self._callback)
+ elif isinstance(res, twisted.internet.defer.Deferred):
+ res.addBoth(zc.twist.Partial(self._callback))
+ # XXX need to tell Partial to retry forever
else:
- failure = False
- self._result = res
- self._status = zc.async.interfaces.CALLBACKS
- self._active_end = datetime.datetime.now(pytz.UTC)
+ if isinstance(res, twisted.python.failure.Failure):
+ res = zc.twist.sanitize(res)
+ self._result = res
+ self._status = zc.async.interfaces.CALLBACKS
+ self._active_end = datetime.datetime.now(pytz.UTC)
+ if self.retries is not None:
+ self.retries.updateData(data_cache)
tm.commit()
- if failure:
- zc.async.utils.tracelog.error(
+
+ def _complete(self, res):
+ if isinstance(res, twisted.python.failure.Failure):
+ zc.async.utils.log.log(
+ self.effective_error_log_level,
'%r failed with traceback:\n%s',
self,
- res.getTraceback(elideFrameworkCode=True, detail='verbose'))
+ res.getTraceback(
+ elideFrameworkCode=True, detail='verbose'))
else:
zc.async.utils.tracelog.info(
- '%r succeeded with result:\n%r',
+ '%r succeeded with result: %r',
self, res)
- return res
+ self.resumeCallbacks()
+ def _callback(self, res):
+ # done within a job or partial, so we can rely on their retry bits to
+ # some degree. However, we commit transactions ourselves, so we have
+ # to be a bit careful that the result hasn't been set already.
+ if self._status == zc.async.interfaces.ACTIVE:
+ self._set_result(res)
+ self._complete(res)
+
def fail(self, e=None):
if e is None:
e = zc.async.interfaces.AbortedError()
@@ -359,14 +614,17 @@
raise zc.async.interfaces.BadStatusError(
'can only call fail on a job with NEW, PENDING, ASSIGNED, or '
'ACTIVE status')
- self._complete(zc.twist.Failure(e),
- transaction.interfaces.ITransactionManager(self))
- self.resumeCallbacks()
+ self._complete(zc.twist.Failure(e))
def resumeCallbacks(self):
if self._status != zc.async.interfaces.CALLBACKS:
raise zc.async.interfaces.BadStatusError(
'can only resumeCallbacks on a job with CALLBACKS status')
+ identifier = 'performing callbacks for %r' % (self,)
+ tm = transaction.interfaces.ITransactionManager(self)
+ zc.async.utils.never_fail(self._resumeCallbacks, identifier, tm)
+
+ def _resumeCallbacks(self):
callbacks = list(self.callbacks)
tm = transaction.interfaces.ITransactionManager(self)
length = 0
@@ -379,27 +637,21 @@
elif j._status == zc.async.interfaces.ACTIVE:
zc.async.utils.tracelog.debug(
'failing aborted callback %r to %r', j, self)
- j.fail()
+ j.handleInterrupt()
elif j._status == zc.async.interfaces.CALLBACKS:
j.resumeCallbacks()
# TODO: this shouldn't raise anything we want to catch, right?
# now, this should catch all the errors except EXPLOSIVE_ERRORS
# cleaning up dead jobs should look something like the above.
- tm.commit()
tm.begin() # syncs
# it's possible that someone added some callbacks, so run until
# we're exhausted.
length += len(callbacks)
callbacks = list(self.callbacks)[length:]
if not callbacks:
- try:
- self._status = zc.async.interfaces.COMPLETED
- if zc.async.interfaces.IAgent.providedBy(self.parent):
- self.parent.jobCompleted(self)
- tm.commit()
- except ZODB.POSException.TransactionError:
- tm.abort()
- callbacks = list(self.callbacks)[length:]
- else:
- break # and return
+ # this whole method is called within a never_fail...
+ self._status = zc.async.interfaces.COMPLETED
+ if zc.async.interfaces.IAgent.providedBy(self.parent):
+ self.parent.jobCompleted(self)
+ tm.commit()
Modified: zc.async/branches/dev/src/zc/async/queue.py
===================================================================
--- zc.async/branches/dev/src/zc/async/queue.py 2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/queue.py 2008-05-20 01:08:43 UTC (rev 86848)
@@ -110,7 +110,7 @@
queue.put(job)
job.assignerUUID = tmp
elif job.status == zc.async.interfaces.ACTIVE:
- queue.put(job.fail)
+ queue.put(job.handleInterrupt)
elif job.status == zc.async.interfaces.CALLBACKS:
queue.put(job.resumeCallbacks)
elif job.status == zc.async.interfaces.COMPLETED:
@@ -238,8 +238,13 @@
self.dispatchers = Dispatchers()
self.dispatchers.__parent__ = self
- def put(self, item, begin_after=None, begin_by=None):
+ def put(self, item, begin_after=None, begin_by=None,
+ error_log_level=None, retry_policy=None):
item = zc.async.interfaces.IJob(item)
+ if error_log_level is not None:
+ item.error_log_level = error_log_level
+ if retry_policy is not None:
+ item.retry_policy = retry_policy
if item.assignerUUID is not None:
raise ValueError(
'cannot add already-assigned job')
Modified: zc.async/branches/dev/src/zc/async/utils.py
===================================================================
--- zc.async/branches/dev/src/zc/async/utils.py 2008-05-20 01:06:56 UTC (rev 86847)
+++ zc.async/branches/dev/src/zc/async/utils.py 2008-05-20 01:08:43 UTC (rev 86848)
@@ -15,6 +15,8 @@
import logging
import sys
+import ZEO.Exceptions
+import ZODB.POSException
import rwproperty
import persistent
import zc.dict
@@ -22,6 +24,15 @@
import zope.bforest.periodic
+EXPLOSIVE_ERRORS = (SystemExit, KeyboardInterrupt)
+
+SYSTEM_ERRORS = (ZEO.Exceptions.ClientDisconnected,
+ ZODB.POSException.POSKeyError)
+
+INITIAL_BACKOFF = 5
+MAX_BACKOFF = 60
+BACKOFF_INCREMENT = 5
+
def simpleWrapper(name):
# notice use of "simple" in function name! A sure sign of trouble!
def wrapper(self, *args, **kwargs):
@@ -236,3 +247,114 @@
def __len__(self):
return len(self._data)
+
+def never_fail(call, identifier, tm):
+ # forever for TransactionErrors; forever, with backoff, for anything else
+ trans_ct = 0
+ backoff_ct = 0
+ backoff = INITIAL_BACKOFF
+ res = None
+ while 1:
+ try:
+ res = call()
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ trans_ct += 1
+ if not trans_ct % 5:
+ log.warning(
+ '%d consecutive transaction errors while %s',
+ ct, identifier, exc_info=True)
+ res = None
+ except EXPLOSIVE_ERRORS:
+ tm.abort()
+ raise
+ except Exception, e:
+ if isinstance(e, SYSTEM_ERRORS):
+ level = logging.ERROR
+ else:
+ level = logging.CRITICAL
+ tm.abort()
+ backoff_ct += 1
+ if backoff_ct == 1:
+ log.log(level,
+ 'first error while %s; will continue in %d seconds',
+ identifier, backoff, exc_info=True)
+ elif not backoff_ct % 5:
+
+ log.log(level,
+ '%d consecutive errors while %s; '
+ 'will continue in %d seconds',
+ backoff_ct, identifier, backoff, exc_info=True)
+ res = None
+ time.sleep(backoff)
+ backoff = min(MAX_BACKOFF, backoff + BACKOFF_INCREMENT)
+ else:
+ return res
+
+def wait_for_system_recovery(call, identifier, tm):
+ # forever for TransactionErrors; forever, with backoff, for SYSTEM_ERRORS
+ trans_ct = 0
+ backoff_ct = 0
+ backoff = INITIAL_BACKOFF
+ res = None
+ while 1:
+ try:
+ res = call()
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ trans_ct += 1
+ if not trans_ct % 5:
+ log.warning(
+ '%d consecutive transaction errors while %s',
+ ct, identifier, exc_info=True)
+ res = None
+ except EXPLOSIVE_ERRORS:
+ tm.abort()
+ raise
+ except SYSTEM_ERRORS:
+ tm.abort()
+ backoff_ct += 1
+ if backoff_ct == 1:
+ log.error('first error while %s; will continue in %d seconds',
+ identifier, backoff, exc_info=True)
+ elif not backoff_ct % 5:
+
+ log.error('%d consecutive errors while %s; '
+ 'will continue in %d seconds',
+ backoff_ct, identifier, backoff, exc_info=True)
+ res = None
+ time.sleep(backoff)
+ backoff = min(MAX_BACKOFF, backoff + BACKOFF_INCREMENT)
+ except:
+ log.error('Error while %s', identifier, exc_info=True)
+ tm.abort()
+ return zc.twist.Failure()
+ else:
+ return res
+
+def try_transaction_five_times(call, identifier, tm):
+ ct = 0
+ res = None
+ while 1:
+ try:
+ res = call()
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ ct += 1
+ if ct >= 5:
+ log.error('Five consecutive transaction errors while %s',
+ identifier, exc_info=True)
+ res = zc.twist.Failure()
+ else:
+ continue
+ except EXPLOSIVE_ERRORS:
+ tm.abort()
+ raise
+ except:
+ tm.abort()
+ log.error('Error while %s', identifier, exc_info=True)
+ res = zc.twist.Failure()
+ return res
\ No newline at end of file
More information about the Checkins
mailing list