[Checkins] SVN: zc.async/trunk/src/zc/async/ some more tests of
handling catastrophes
Gary Poster
gary at zope.com
Fri May 9 21:37:39 EDT 2008
Log message for revision 86598:
some more tests of handling catastrophes
Changed:
U zc.async/trunk/src/zc/async/CHANGES.txt
U zc.async/trunk/src/zc/async/TODO.txt
U zc.async/trunk/src/zc/async/catastrophes.txt
U zc.async/trunk/src/zc/async/subscribers.py
U zc.async/trunk/src/zc/async/testing.py
-=-
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt 2008-05-10 00:27:54 UTC (rev 86597)
+++ zc.async/trunk/src/zc/async/CHANGES.txt 2008-05-10 01:37:38 UTC (rev 86598)
@@ -26,8 +26,8 @@
above, use ``ZODB.utils.p64`` to convert back to an oid that the ZODB will
recognize.
-- Bugfix: in failing a job, the job thought it was in agent, and the ``fail``
- call failed. This is not tested by the first example in new doctest
+- Bugfix: in failing a job, the job thought it was in its old agent, and the
+ ``fail`` call failed. This is now tested by the first example in new doctest
``catastrophes.txt``.
1.1 (2008-04-24)
Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt 2008-05-10 00:27:54 UTC (rev 86597)
+++ zc.async/trunk/src/zc/async/TODO.txt 2008-05-10 01:37:38 UTC (rev 86598)
@@ -4,11 +4,13 @@
- need CRITICAL logs for callbacks
-- when database went away
+- when database went away, and then came back, async didn't come back.
+- be even more pessimistic about memory for saved polls and job info in
+ dispatcher.
+
For future versions:
-- Write the z3monitor tests.
- queues should be pluggable like agent with filter
- show how to broadcast, maybe add conveniences
- show how to use with collapsing jobs (hint to future self: use external queue
@@ -16,7 +18,8 @@
- write tips and tricks
* avoid long transactions if possible. really avoid long transactions
involving frequently written objects. Discuss ramifications and
- strategies.
+ strategies, such as doing big work in one job, then in callback schedule
+ actually writing the data into the hotspot.
* in zope.app.testing.functional tests, zc.async doesn't do well being
started in a layer's setup because then it is associated with the
wrapped layer DB, and the test is associated with the DemoStorage wrapper,
Modified: zc.async/trunk/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes.txt 2008-05-10 00:27:54 UTC (rev 86597)
+++ zc.async/trunk/src/zc/async/catastrophes.txt 2008-05-10 01:37:38 UTC (rev 86598)
@@ -27,7 +27,8 @@
These are the scenarios we'll contemplate:
- The system has a single dispatcher. The dispatcher is working on a job with a
- callback. The dispatcher dies, and then restarts, cleaning up.
+ callback. The dispatcher dies, and then restarts, cleaning up. We'll do two
+ variants, one with a graceful shutdown and one with a hard crash.
- The system has two dispatchers. One dispatcher is working on a job with a
callback, and then dies. The other dispatcher cleans up.
@@ -120,10 +121,273 @@
case, simply returning a string--once the dispatcher got back online
[#cleanup1]_.
---------------------------------------------------------------
-Dispatcher Dies "Hard" While Performing a Job, Sibling Resumes
---------------------------------------------------------------
+------------------------------------------------
+Dispatcher Crashes "Hard" While Performing a Job
+------------------------------------------------
+Our next catastrophe only changes one aspect to the previous one: the
+dispatcher does not stop gracefully, and does not have a chance to clean up its
+active jobs. It is a "hard" crash.
+
+To show this, we will start a job, simulate the dispatcher dying "hard," and
+restart it so it clean up.
+
+So, first we start a long-running job in the dispatcher.
+
+ >>> lock.acquire()
+ True
+ >>> job = queue.put(wait_for_me)
+ >>> callback_job = job.addCallbacks(failure=handle_error)
+ >>> transaction.commit()
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> poll = zc.async.testing.get_poll(dispatcher)
+ >>> wait_for_start(job)
+
+Now we'll "crash" the dispatcher.
+
+ >>> dispatcher.activated = False # this will make polling stop, without
+ ... # cleanup
+ >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
+ >>> dispatcher.thread.join(3)
+
+Hard crashes can be detected because the dispatchers write datetimes to the
+database every few polls. A given dispatcher instance does this for each queue
+on a ``DispatcherAgents`` object available in ``queue.dispatchers[UUID]``,
+where ``UUID`` is the uuid of that dispatcher.
+
+The ``DispatcherAgents`` object has four pertinent attributes:
+``ping_interval``, ``ping_death_interval``, ``last_ping``, and ``dead``. About
+every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is supposed
+to write a ``datetime`` to ``last_ping``. If the ``last_ping`` plus the
+``ping_death_interval`` (also a ``timedelta``) is older than now, the
+dispatcher is considered to be ``dead``, and old jobs should be cleaned up.
+
+The ``ping_interval`` defaults to 30 seconds, and the ``ping_death_interval``
+defaults to 60 seconds. Generally, the ``ping_death_interval`` should be at
+least two or three poll intervals (``zc.async.dispatcher.get().poll_interval``)
+greater than the ``ping_interval``.
+
+The ping hasn't timed out yet, so the dispatcher isn't considered dead yet.
+
+ >>> _ = transaction.begin()
+ >>> import zc.async.instanceuuid
+ >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+ >>> da.ping_death_interval
+ datetime.timedelta(0, 60)
+ >>> da.ping_interval
+ datetime.timedelta(0, 30)
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ False
+
+Therefore, the job is still sitting around in the dispatcher's pile in the
+database (the ``main`` key is for the ``main`` agent installed in this
+dispatcher in the set up for these examples).
+
+ >>> job in da['main']
+ True
+ >>> job.status == zc.async.interfaces.ACTIVE
+ True
+
+Let's start our dispatcher up again.
+
+ >>> old_dispatcher = dispatcher
+ >>> zc.async.dispatcher.clear()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
+ >>> dispatcher = zc.async.dispatcher.get()
+
+Initially, it's going to be a bit confused, because it sees that the
+DispatcherAgents object is ``activated``, and not ``dead``. It can't tell if
+there's another process using its same UUID, or if it is looking at the result
+of a hard crash.
+
+ >>> zc.async.testing.wait_for_result(job, seconds=1)
+ Traceback (most recent call last):
+ ...
+ AssertionError: job never completed
+ >>> zc.async.testing.get_poll(dispatcher, seconds=1)
+ {'': None}
+ >>> for r in reversed(event_logs.records):
+ ... if r.levelname == 'ERROR':
+ ... break
+ ... else:
+ ... assert False, 'did not find log'
+ ...
+ >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ UUID ... already activated in queue (oid 4): another process?
+ (To stop poll attempts in this process, set
+ ``zc.async.dispatcher.get().activated = False``. To stop polls
+ permanently, don't start a zc.async.dispatcher!)
+
+
+To speed up the realization of our dispatcher that the previous activation is
+``dead``, we'll set the ping_death_interval to just one second.
+
+ >>> _ = transaction.begin()
+ >>> da.dead
+ False
+ >>> import datetime
+ >>> da.ping_death_interval = datetime.timedelta(seconds=1)
+ >>> da.dead
+ True
+ >>> bool(da.activated)
+ True
+ >>> transaction.commit()
+
+After the next poll, the dispatcher will have cleaned up its old tasks in the
+same way we saw in the previous example. The job's ``fail`` method will be
+called, and the callback will be performed. The DispatcherAgents object is
+no longer dead, because it is tied to the new instance of the dispatcher.
+
+ >>> poll = zc.async.testing.get_poll(dispatcher)
+ >>> _ = transaction.begin()
+ >>> job in da['main']
+ False
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ False
+ >>> fail_job = job.parent
+ >>> fail_job
+ <zc.async.job.Job (oid 78, db 'unnamed') ``zc.async.job.Job (oid 57, db 'unnamed') :fail()``>
+
+Let's see it happen.
+
+ >>> zc.async.testing.wait_for_result(fail_job)
+ >>> job.status == zc.async.interfaces.COMPLETED
+ True
+ >>> job.result
+ <zc.twist.Failure zc.async.interfaces.AbortedError>
+ >>> callback_job.status == zc.async.interfaces.COMPLETED
+ True
+ >>> callback_job.result
+ '...I handled the error...'
+
+The dispatcher cleaned up its own "hard" crash.
+
+[#cleanup2]_
+
+-----------------------------------------------------------------
+Dispatcher Crashes "Hard" While Performing a Job, Sibling Resumes
+-----------------------------------------------------------------
+
+Our next catastrophe is the same as the one before, except, after one
+dispatcher's hard crash, another dispatcher is around to clean up the dead
+jobs.
+
+To show this, we will start a job, start a second dispatcher, simulate the
+first dispatcher dying "hard," and watch the second dispatcher clean up
+after the first one.
+
+So, first we start a long-running job in the dispatcher as before.
+
+ >>> lock.acquire()
+ True
+ >>> job = queue.put(wait_for_me)
+ >>> callback_job = job.addCallbacks(failure=handle_error)
+ >>> transaction.commit()
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> poll = zc.async.testing.get_poll(dispatcher)
+ >>> wait_for_start(job)
+
+Now we'll start up an alternate dispatcher.
+
+ >>> import uuid
+ >>> alt_uuid = uuid.uuid1()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.5, uuid=alt_uuid)(
+ ... zc.async.interfaces.DatabaseOpened(db))
+ >>> alt_dispatcher = zc.async.dispatcher.get(alt_uuid)
+
+We're also going to set the main dispatcher's ``ping_death_interval`` back to
+60 seconds so we can see some polls in the alternate dispatcher before it gets
+around to cleaning up.
+
+ >>> da.ping_death_interval = datetime.timedelta(seconds=60)
+ >>> transaction.commit()
+
+Now we'll "crash" the dispatcher.
+
+ >>> dispatcher.activated = False # this will make polling stop, without
+ ... # cleanup
+ >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
+ >>> dispatcher.thread.join(3)
+
+As discussed in the previous example, the polling hasn't timed out yet, so the
+alternate dispatcher can't know that the first one is dead. Therefore, the job
+is still sitting around in the old dispatcher's pile in the database.
+
+ >>> _ = transaction.begin()
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ False
+ >>> job.status == zc.async.interfaces.ACTIVE
+ True
+ >>> alt_poll_1 = zc.async.testing.get_poll(alt_dispatcher)
+ >>> _ = transaction.begin()
+ >>> job in da['main']
+ True
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ False
+ >>> alt_poll_2 = zc.async.testing.get_poll(alt_dispatcher)
+ >>> _ = transaction.begin()
+ >>> job in da['main']
+ True
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ False
+
+Above, the ping_death_interval was returned to the default of 60 seconds. To
+speed up the realization of our second dispatcher that the first one is dead,
+we'll set the ping_death_interval back down to just one second.
+
+ >>> da.ping_death_interval
+ datetime.timedelta(0, 60)
+ >>> import datetime
+ >>> da.ping_death_interval = datetime.timedelta(seconds=1)
+ >>> da.dead
+ True
+ >>> bool(da.activated)
+ True
+ >>> transaction.commit()
+
+After the second dispatcher gets a poll--a chance to notice--it will have
+cleaned up the first dispatcher's old tasks in the same way we saw in the
+previous example. The job's ``fail`` method will be called, and the callback
+will be performed.
+
+ >>> alt_poll_3 = zc.async.testing.get_poll(alt_dispatcher)
+ >>> _ = transaction.begin()
+ >>> job in da['main']
+ False
+ >>> bool(da.activated)
+ False
+ >>> da.dead
+ True
+ >>> fail_job = job.parent
+ >>> fail_job
+ <zc.async.job.Job (oid 121, db 'unnamed') ``zc.async.job.Job (oid 84, db 'unnamed') :fail()``>
+ >>> zc.async.testing.wait_for_result(fail_job)
+ >>> job.status == zc.async.interfaces.COMPLETED
+ True
+ >>> job.result
+ <zc.twist.Failure zc.async.interfaces.AbortedError>
+ >>> callback_job.status == zc.async.interfaces.COMPLETED
+ True
+ >>> callback_job.result
+ '...I handled the error...'
+
+The sibling, then, was able to clean up the mess left by the "hard" crash of
+the first dispatcher.
+
+[#cleanup3]_
+
--------------
Callback Fails
--------------
@@ -143,9 +407,10 @@
There are some catastrophes from which there are no easy fixes like these. For
instance, imagine you have communicated with an external system, and gotten a
reply that you have successfully made a transaction there, but then the
-dispatcher dies, or the database disappears, before you have a chance to redo.
+dispatcher dies, or the database disappears, before you have a chance to commit
+the local transaction recording the success. Your code needs to see that
-[#last_cleanup]_
+...multidatabase...
.. ......... ..
.. Footnotes ..
@@ -199,7 +464,29 @@
>>> old_dispatcher.thread.join(3)
>>> old_dispatcher.dead_pools[0].threads[0].join(3)
-.. [#last_cleanup]
+.. [#cleanup2]
- >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+ >>> lock.release()
+ >>> old_dispatcher.thread.join(3)
+ >>> for queue_pools in old_dispatcher.queues.values():
+ ... for name, pool in queue_pools.items():
+ ... pool.setSize(0)
+ ... for thread in pool.threads:
+ ... thread.join(3)
+ ...
+ -3
+
+.. [#cleanup3]
+
+ >>> lock.release()
>>> dispatcher.thread.join(3)
+ >>> for queue_pools in dispatcher.queues.values():
+ ... for name, pool in queue_pools.items():
+ ... pool.setSize(0)
+ ... for thread in pool.threads:
+ ... thread.join(3)
+ ...
+ -3
+ >>> alt_dispatcher.reactor.callFromThread(alt_dispatcher.reactor.stop)
+ >>> alt_dispatcher.thread.join(3)
+ >>> alt_dispatcher.dead_pools[0].threads[0].join(3)
Modified: zc.async/trunk/src/zc/async/subscribers.py
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.py 2008-05-10 00:27:54 UTC (rev 86597)
+++ zc.async/trunk/src/zc/async/subscribers.py 2008-05-10 01:37:38 UTC (rev 86598)
@@ -80,9 +80,13 @@
class ThreadedDispatcherInstaller(object):
def __init__(self,
poll_interval=5,
- reactor_factory=twisted.internet.selectreactor.SelectReactor):
+ reactor_factory=twisted.internet.selectreactor.SelectReactor,
+ uuid=None): # optional uuid is really just for tests; see
+ # catastrophes.txt, for instance, which runs
+ # two dispatchers simultaneously.
self.poll_interval = poll_interval
self.reactor_factory = reactor_factory
+ self.uuid = uuid
# This IDatabaseOpenedEvent will be from zope.app.appsetup if that
# package is around
zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
@@ -90,7 +94,8 @@
def __call__(self, ev):
reactor = self.reactor_factory()
dispatcher = zc.async.dispatcher.Dispatcher(
- ev.database, reactor, poll_interval=self.poll_interval)
+ ev.database, reactor, poll_interval=self.poll_interval,
+ uuid=self.uuid)
def start():
dispatcher.activate()
reactor.run(installSignalHandlers=0)
Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py 2008-05-10 00:27:54 UTC (rev 86597)
+++ zc.async/trunk/src/zc/async/testing.py 2008-05-10 01:37:38 UTC (rev 86598)
@@ -183,18 +183,18 @@
# helper functions convenient for tests
-def get_poll(dispatcher, count=None):
+def get_poll(dispatcher, count=None, seconds=6):
if count is None:
count = len(dispatcher.polls)
- for i in range(60):
+ for i in range(seconds * 10):
if len(dispatcher.polls) > count:
return dispatcher.polls.first()
time.sleep(0.1)
else:
assert False, 'no poll!'
-def wait_for_result(job):
- for i in range(60):
+def wait_for_result(job, seconds=6):
+ for i in range(seconds * 10):
t = transaction.begin()
if job.status == zc.async.interfaces.COMPLETED:
return job.result
More information about the Checkins
mailing list