[Checkins] SVN: zc.async/trunk/src/zc/async/ some more tests of handling catastrophes

Gary Poster gary at zope.com
Fri May 9 21:37:39 EDT 2008


Log message for revision 86598:
  some more tests of handling catastrophes

Changed:
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/TODO.txt
  U   zc.async/trunk/src/zc/async/catastrophes.txt
  U   zc.async/trunk/src/zc/async/subscribers.py
  U   zc.async/trunk/src/zc/async/testing.py

-=-
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-05-10 00:27:54 UTC (rev 86597)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-05-10 01:37:38 UTC (rev 86598)
@@ -26,8 +26,8 @@
   above, use ``ZODB.utils.p64`` to convert back to an oid that the ZODB will
   recognize.
 
-- Bugfix: in failing a job, the job thought it was in agent, and the ``fail``
-  call failed.  This is not tested by the first example in new doctest
+- Bugfix: in failing a job, the job thought it was in its old agent, and the
+  ``fail`` call failed. This is now tested by the first example in new doctest
   ``catastrophes.txt``.
 
 1.1 (2008-04-24)

Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt	2008-05-10 00:27:54 UTC (rev 86597)
+++ zc.async/trunk/src/zc/async/TODO.txt	2008-05-10 01:37:38 UTC (rev 86598)
@@ -4,11 +4,13 @@
 
 - need CRITICAL logs for callbacks
 
-- when database went away
+- when database went away, and then came back, async didn't come back.
 
+- be even more pessimistic about memory for saved polls and job info in
+  dispatcher.
+
 For future versions:
 
-- Write the z3monitor tests.
 - queues should be pluggable like agent with filter
 - show how to broadcast, maybe add conveniences
 - show how to use with collapsing jobs (hint to future self: use external queue
@@ -16,7 +18,8 @@
 - write tips and tricks
   * avoid long transactions if possible.  really avoid long transactions
     involving frequently written objects.  Discuss ramifications and
-    strategies.
+    strategies, such as doing big work in one job, then in callback schedule
+    actually writing the data into the hotspot.
   * in zope.app.testing.functional tests, zc.async doesn't do well being
     started in a layer's setup because then it is associated with the
     wrapped layer DB, and the test is associated with the DemoStorage wrapper,

Modified: zc.async/trunk/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes.txt	2008-05-10 00:27:54 UTC (rev 86597)
+++ zc.async/trunk/src/zc/async/catastrophes.txt	2008-05-10 01:37:38 UTC (rev 86598)
@@ -27,7 +27,8 @@
 These are the scenarios we'll contemplate:
 
 - The system has a single dispatcher. The dispatcher is working on a job with a
-  callback. The dispatcher dies, and then restarts, cleaning up.
+  callback. The dispatcher dies, and then restarts, cleaning up.  We'll do two
+  variants, one with a graceful shutdown and one with a hard crash.
 
 - The system has two dispatchers. One dispatcher is working on a job with a
   callback, and then dies. The other dispatcher cleans up.
@@ -120,10 +121,273 @@
 case, simply returning a string--once the dispatcher got back online
 [#cleanup1]_.
 
---------------------------------------------------------------
-Dispatcher Dies "Hard" While Performing a Job, Sibling Resumes
---------------------------------------------------------------
+------------------------------------------------
+Dispatcher Crashes "Hard" While Performing a Job
+------------------------------------------------
 
+Our next catastrophe only changes one aspect to the previous one: the
+dispatcher does not stop gracefully, and does not have a chance to clean up its
+active jobs.  It is a "hard" crash.
+
+To show this, we will start a job, simulate the dispatcher dying "hard," and
+restart it so it clean up.
+
+So, first we start a long-running job in the dispatcher.
+
+    >>> lock.acquire()
+    True
+    >>> job = queue.put(wait_for_me)
+    >>> callback_job = job.addCallbacks(failure=handle_error)
+    >>> transaction.commit()
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> wait_for_start(job)
+
+Now we'll "crash" the dispatcher.
+
+    >>> dispatcher.activated = False # this will make polling stop, without
+    ...                              # cleanup
+    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
+    >>> dispatcher.thread.join(3)
+
+Hard crashes can be detected because the dispatchers write datetimes to the
+database every few polls. A given dispatcher instance does this for each queue
+on a ``DispatcherAgents`` object available in ``queue.dispatchers[UUID]``,
+where ``UUID`` is the uuid of that dispatcher.
+
+The ``DispatcherAgents`` object has four pertinent attributes:
+``ping_interval``, ``ping_death_interval``, ``last_ping``, and ``dead``. About
+every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is supposed
+to write a ``datetime`` to ``last_ping``. If the ``last_ping`` plus the
+``ping_death_interval`` (also a ``timedelta``) is older than now, the
+dispatcher is considered to be ``dead``, and old jobs should be cleaned up.
+
+The ``ping_interval`` defaults to 30 seconds, and the ``ping_death_interval``
+defaults to 60 seconds. Generally, the ``ping_death_interval`` should be at
+least two or three poll intervals (``zc.async.dispatcher.get().poll_interval``)
+greater than the ``ping_interval``.
+
+The ping hasn't timed out yet, so the dispatcher isn't considered dead yet.
+
+    >>> _ = transaction.begin()
+    >>> import zc.async.instanceuuid
+    >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+    >>> da.ping_death_interval
+    datetime.timedelta(0, 60)
+    >>> da.ping_interval
+    datetime.timedelta(0, 30)
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    False
+
+Therefore, the job is still sitting around in the dispatcher's pile in the
+database (the ``main`` key is for the ``main`` agent installed in this
+dispatcher in the set up for these examples).
+
+    >>> job in da['main']
+    True
+    >>> job.status == zc.async.interfaces.ACTIVE
+    True
+
+Let's start our dispatcher up again.
+
+    >>> old_dispatcher = dispatcher
+    >>> zc.async.dispatcher.clear()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...         poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
+    >>> dispatcher = zc.async.dispatcher.get()
+
+Initially, it's going to be a bit confused, because it sees that the
+DispatcherAgents object is ``activated``, and not ``dead``. It can't tell if
+there's another process using its same UUID, or if it is looking at the result
+of a hard crash.
+
+    >>> zc.async.testing.wait_for_result(job, seconds=1)
+    Traceback (most recent call last):
+    ...
+    AssertionError: job never completed
+    >>> zc.async.testing.get_poll(dispatcher, seconds=1)
+    {'': None}
+    >>> for r in reversed(event_logs.records):
+    ...     if r.levelname == 'ERROR':
+    ...         break
+    ... else:
+    ...     assert False, 'did not find log'
+    ...
+    >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    UUID ... already activated in queue  (oid 4): another process?
+    (To stop poll attempts in this process, set
+    ``zc.async.dispatcher.get().activated = False``.  To stop polls
+    permanently, don't start a zc.async.dispatcher!)
+
+
+To speed up the realization of our dispatcher that the previous activation is
+``dead``, we'll set the ping_death_interval to just one second.
+
+    >>> _ = transaction.begin()
+    >>> da.dead
+    False
+    >>> import datetime
+    >>> da.ping_death_interval = datetime.timedelta(seconds=1)
+    >>> da.dead
+    True
+    >>> bool(da.activated)
+    True
+    >>> transaction.commit()
+
+After the next poll, the dispatcher will have cleaned up its old tasks in the
+same way we saw in the previous example. The job's ``fail`` method will be
+called, and the callback will be performed.  The DispatcherAgents object is
+no longer dead, because it is tied to the new instance of the dispatcher.
+
+    >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> _ = transaction.begin()
+    >>> job in da['main']
+    False
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    False
+    >>> fail_job = job.parent
+    >>> fail_job
+    <zc.async.job.Job (oid 78, db 'unnamed') ``zc.async.job.Job (oid 57, db 'unnamed') :fail()``>
+
+Let's see it happen.
+
+    >>> zc.async.testing.wait_for_result(fail_job)
+    >>> job.status == zc.async.interfaces.COMPLETED
+    True
+    >>> job.result
+    <zc.twist.Failure zc.async.interfaces.AbortedError>
+    >>> callback_job.status == zc.async.interfaces.COMPLETED
+    True
+    >>> callback_job.result
+    '...I handled the error...'
+
+The dispatcher cleaned up its own "hard" crash.
+
+[#cleanup2]_
+
+-----------------------------------------------------------------
+Dispatcher Crashes "Hard" While Performing a Job, Sibling Resumes
+-----------------------------------------------------------------
+
+Our next catastrophe is the same as the one before, except, after one
+dispatcher's hard crash, another dispatcher is around to clean up the dead
+jobs.
+
+To show this, we will start a job, start a second dispatcher, simulate the
+first dispatcher dying "hard," and watch the second dispatcher clean up
+after the first one.
+
+So, first we start a long-running job in the dispatcher as before.
+
+    >>> lock.acquire()
+    True
+    >>> job = queue.put(wait_for_me)
+    >>> callback_job = job.addCallbacks(failure=handle_error)
+    >>> transaction.commit()
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> wait_for_start(job)
+
+Now we'll start up an alternate dispatcher.
+
+    >>> import uuid
+    >>> alt_uuid = uuid.uuid1()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...     poll_interval=0.5, uuid=alt_uuid)(
+    ...     zc.async.interfaces.DatabaseOpened(db))
+    >>> alt_dispatcher = zc.async.dispatcher.get(alt_uuid)
+
+We're also going to set the main dispatcher's ``ping_death_interval`` back to
+60 seconds so we can see some polls in the alternate dispatcher before it gets
+around to cleaning up.
+
+    >>> da.ping_death_interval = datetime.timedelta(seconds=60)
+    >>> transaction.commit()
+
+Now we'll "crash" the dispatcher.
+
+    >>> dispatcher.activated = False # this will make polling stop, without
+    ...                              # cleanup
+    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
+    >>> dispatcher.thread.join(3)
+
+As discussed in the previous example, the polling hasn't timed out yet, so the
+alternate dispatcher can't know that the first one is dead. Therefore, the job
+is still sitting around in the old dispatcher's pile in the database.
+
+    >>> _ = transaction.begin()
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    False
+    >>> job.status == zc.async.interfaces.ACTIVE
+    True
+    >>> alt_poll_1 = zc.async.testing.get_poll(alt_dispatcher)
+    >>> _ = transaction.begin()
+    >>> job in da['main']
+    True
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    False
+    >>> alt_poll_2 = zc.async.testing.get_poll(alt_dispatcher)
+    >>> _ = transaction.begin()
+    >>> job in da['main']
+    True
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    False
+
+Above, the ping_death_interval was returned to the default of 60 seconds. To
+speed up the realization of our second dispatcher that the first one is dead,
+we'll set the ping_death_interval back down to just one second.
+
+    >>> da.ping_death_interval
+    datetime.timedelta(0, 60)
+    >>> import datetime
+    >>> da.ping_death_interval = datetime.timedelta(seconds=1)
+    >>> da.dead
+    True
+    >>> bool(da.activated)
+    True
+    >>> transaction.commit()
+
+After the second dispatcher gets a poll--a chance to notice--it will have
+cleaned up the first dispatcher's old tasks in the same way we saw in the
+previous example.  The job's ``fail`` method will be called, and the callback
+will be performed.
+
+    >>> alt_poll_3 = zc.async.testing.get_poll(alt_dispatcher)
+    >>> _ = transaction.begin()
+    >>> job in da['main']
+    False
+    >>> bool(da.activated)
+    False
+    >>> da.dead
+    True
+    >>> fail_job = job.parent
+    >>> fail_job
+    <zc.async.job.Job (oid 121, db 'unnamed') ``zc.async.job.Job (oid 84, db 'unnamed') :fail()``>
+    >>> zc.async.testing.wait_for_result(fail_job)
+    >>> job.status == zc.async.interfaces.COMPLETED
+    True
+    >>> job.result
+    <zc.twist.Failure zc.async.interfaces.AbortedError>
+    >>> callback_job.status == zc.async.interfaces.COMPLETED
+    True
+    >>> callback_job.result
+    '...I handled the error...'
+
+The sibling, then, was able to clean up the mess left by the "hard" crash of
+the first dispatcher.
+
+[#cleanup3]_
+
 --------------
 Callback Fails
 --------------
@@ -143,9 +407,10 @@
 There are some catastrophes from which there are no easy fixes like these.  For
 instance, imagine you have communicated with an external system, and gotten a
 reply that you have successfully made a transaction there, but then the
-dispatcher dies, or the database disappears, before you have a chance to redo.
+dispatcher dies, or the database disappears, before you have a chance to commit
+the local transaction recording the success.  Your code needs to see that
 
-[#last_cleanup]_
+...multidatabase...
 
 .. ......... ..
 .. Footnotes ..
@@ -199,7 +464,29 @@
     >>> old_dispatcher.thread.join(3)
     >>> old_dispatcher.dead_pools[0].threads[0].join(3)
 
-.. [#last_cleanup]
+.. [#cleanup2]
 
-    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+    >>> lock.release()
+    >>> old_dispatcher.thread.join(3)
+    >>> for queue_pools in old_dispatcher.queues.values():
+    ...     for name, pool in queue_pools.items():
+    ...         pool.setSize(0)
+    ...         for thread in pool.threads:
+    ...             thread.join(3)
+    ...
+    -3
+
+.. [#cleanup3]
+
+    >>> lock.release()
     >>> dispatcher.thread.join(3)
+    >>> for queue_pools in dispatcher.queues.values():
+    ...     for name, pool in queue_pools.items():
+    ...         pool.setSize(0)
+    ...         for thread in pool.threads:
+    ...             thread.join(3)
+    ...
+    -3
+    >>> alt_dispatcher.reactor.callFromThread(alt_dispatcher.reactor.stop)
+    >>> alt_dispatcher.thread.join(3)
+    >>> alt_dispatcher.dead_pools[0].threads[0].join(3)

Modified: zc.async/trunk/src/zc/async/subscribers.py
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.py	2008-05-10 00:27:54 UTC (rev 86597)
+++ zc.async/trunk/src/zc/async/subscribers.py	2008-05-10 01:37:38 UTC (rev 86598)
@@ -80,9 +80,13 @@
 class ThreadedDispatcherInstaller(object):
     def __init__(self,
                  poll_interval=5,
-                 reactor_factory=twisted.internet.selectreactor.SelectReactor):
+                 reactor_factory=twisted.internet.selectreactor.SelectReactor,
+                 uuid=None): # optional uuid is really just for tests; see
+                             # catastrophes.txt, for instance, which runs
+                             # two dispatchers simultaneously.
         self.poll_interval = poll_interval
         self.reactor_factory = reactor_factory
+        self.uuid = uuid
         # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
         # package is around
         zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
@@ -90,7 +94,8 @@
     def __call__(self, ev):
         reactor = self.reactor_factory()
         dispatcher = zc.async.dispatcher.Dispatcher(
-            ev.database, reactor, poll_interval=self.poll_interval)
+            ev.database, reactor, poll_interval=self.poll_interval,
+            uuid=self.uuid)
         def start():
             dispatcher.activate()
             reactor.run(installSignalHandlers=0)

Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py	2008-05-10 00:27:54 UTC (rev 86597)
+++ zc.async/trunk/src/zc/async/testing.py	2008-05-10 01:37:38 UTC (rev 86598)
@@ -183,18 +183,18 @@
 
 # helper functions convenient for tests
 
-def get_poll(dispatcher, count=None):
+def get_poll(dispatcher, count=None, seconds=6):
     if count is None:
         count = len(dispatcher.polls)
-    for i in range(60):
+    for i in range(seconds * 10):
         if len(dispatcher.polls) > count:
             return dispatcher.polls.first()
         time.sleep(0.1)
     else:
         assert False, 'no poll!'
 
-def wait_for_result(job):
-    for i in range(60):
+def wait_for_result(job, seconds=6):
+    for i in range(seconds * 10):
         t = transaction.begin()
         if job.status == zc.async.interfaces.COMPLETED:
             return job.result



More information about the Checkins mailing list