[Checkins] SVN: zc.async/trunk/src/zc/async/catastrophes_revisited.txt add missing file from last checkin :-(

Gary Poster gary at modernsongs.com
Fri Sep 5 20:35:05 EDT 2008


Log message for revision 90912:
  add missing file from last checkin :-(

Changed:
  A   zc.async/trunk/src/zc/async/catastrophes_revisited.txt

-=-
Added: zc.async/trunk/src/zc/async/catastrophes_revisited.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes_revisited.txt	                        (rev 0)
+++ zc.async/trunk/src/zc/async/catastrophes_revisited.txt	2008-09-06 00:35:04 UTC (rev 90912)
@@ -0,0 +1,172 @@
+Catastrophes Revisited
+======================
+
+The examples in :ref:`recovering-from-catastrophes` show how various system
+catastrophes are handled by the async system.
+
+This document discusses a problem that can be caused by the mechanism used to
+solve one of those catastrophes.
+
+One of the last catastrophes discussed in the other document is the
+:ref:`hard-crash-with-sibling-recovery`.  A central mechanism for this recovery
+is that polls are recorded to the database as a "ping" after a certain
+duration, selected on a ``DispatcherAgents`` object as a ``ping_interval``.
+The ping's datetime is recorded as the ``last_ping``.  If the ``last_ping``
+plus the ``DispatcherAgents``'s ``ping_death_interval`` is less than now,
+the dispatcher is considered ``dead``.  Sibling processes then are supposed to
+clean out agents that they recognize as dead.
+
+A serious problem can occur if an agent appears dead, but is not.  This can
+happen, for instance, if one or more long-running commits lock out a commit to
+record a poll (database commits are serial).  In this case, jobs may be
+removed from an agent, but the agent's dispatcher is still working on them.
+The jobs may even be assigned to another agent, so that two agents are working
+on them simultaneously.
+
+Our first clue of a problem within the old process is at commit.  Write
+conflicts will fire the first time the old process tries to commit, because
+the job status changed underneath the process, and the process is trying to
+change the job (either the status or the result).
+
+The old process should not retry.  The new process, if any, is responsible for
+this job now.
+
+Let's look at an example of this.  First, some setup.
+
+So, first we start a long-running job in the dispatcher as before.
+
+    >>> import ZODB.FileStorage
+    >>> storage = ZODB.FileStorage.FileStorage(
+    ...     'main.fs', create=True)
+    >>> from ZODB.DB import DB
+    >>> db = DB(storage)
+    >>> conn = db.open()
+    >>> root = conn.root()
+    >>> import zc.async.configure
+    >>> zc.async.configure.base()
+    >>> import zc.async.subscribers
+    >>> import zope.component
+    >>> zope.component.provideHandler(zc.async.subscribers.queue_installer)
+    >>> zope.component.provideHandler(
+    ...     zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...         poll_interval=0.1))
+    >>> zope.component.provideHandler(zc.async.subscribers.agent_installer)
+    >>> import zope.event
+    >>> import zc.async.interfaces
+    >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
+    >>> import transaction
+    >>> _ = transaction.begin()
+
+    >>> import zope.component
+    >>> import transaction
+    >>> import zc.async.interfaces
+    >>> import zc.async.testing
+    >>> import zc.async.dispatcher
+    >>> import threading
+
+    >>> queue = root[zc.async.interfaces.KEY]['']
+    >>> lock = threading.Lock()
+    >>> lock.acquire()
+    True
+    >>> def wait_for_me():
+    ...     lock.acquire()
+    ...     return 42
+    ...
+    >>> job = queue.put(wait_for_me)
+    >>> transaction.commit()
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> zc.async.testing.wait_for_start(job)
+    >>> import ZODB.utils
+    >>> dispatcher.getJobInfo(ZODB.utils.u64(job._p_oid))['reassigned']
+    False
+
+Now we'll set the ping_death_interval to a too-short value (shorter than the
+poll_interval) so that the dispatcher will appear to be dead.
+
+    >>> import zc.async.instanceuuid
+    >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+    >>> import datetime
+    >>> da.ping_death_interval = datetime.timedelta(seconds=0.01)
+    >>> transaction.commit()
+
+Now we'll start up an alternate dispatcher, polling *really* fast.
+
+    >>> import uuid
+    >>> alt_uuid = uuid.uuid1()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...     poll_interval=0.01, uuid=alt_uuid)(
+    ...     zc.async.interfaces.DatabaseOpened(db))
+    >>> alt_dispatcher = zc.async.dispatcher.get(alt_uuid)
+    >>> poll = zc.async.testing.get_poll(alt_dispatcher)
+
+Soon, the job should be in both agents.
+
+    >>> ignore = transaction.begin()
+    >>> alt_da = queue.dispatchers[alt_uuid]
+    >>> alt_agent = alt_da['main']
+    >>> from time import sleep as time_sleep
+    >>> for i in range(10):
+    ...     ignore = transaction.begin()
+    ...     if len(alt_agent):
+    ...         break
+    ...     time_sleep(0.1)
+    ... else:
+    ...     print 'timed out!'
+    ...
+    >>> import ZODB.utils
+    >>> if ZODB.utils.u64(list(alt_agent)[0]._p_oid) == (
+    ...     dispatcher.getActiveJobIds()[0][0]):
+    ...     print "matches"
+    ... else:
+    ...     print ('no match:',
+    ...            ZODB.utils.u64(list(alt_agent)[0]._p_oid),
+    ...            dispatcher.getActiveJobIds()[0][0])
+    ...
+    matches
+    >>> zc.async.testing.wait_for_start(job)
+
+Note that the dispatcher's getJobInfo method will recognize that the job has
+been reassigned (or at least removed) once it polls.
+
+    >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> dispatcher.getJobInfo(ZODB.utils.u64(job._p_oid))['reassigned']
+    True
+
+Now we'll let the first thread working the job complete.  It will fail,
+emptying the agent and logging the problem.
+
+    >>> lock.release()
+    >>> for i in range(10):
+    ...     if not len(dispatcher.getActiveJobIds()):
+    ...         break
+    ...     time_sleep(0.1)
+    ... else:
+    ...     print 'timed out!'
+    ...
+    >>> for r in reversed(event_logs.records):
+    ...     if r.levelname == 'ERROR':
+    ...         break
+    ... else:
+    ...     assert False, 'did not find log'
+    ...
+    >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    <...Job ...> was reassigned.  Likely cause was that polling was unable to
+    occur as regularly as expected, perhaps because of long commit times in
+    the application.
+
+    >>> dispatcher.getActiveJobIds()
+    []
+
+Now we'll let the job run to completion in the new dispatcher.
+
+    >>> lock.release()
+    >>> zc.async.testing.wait_for_result(job)
+    42
+
+Q.E.D.
+
+Now we'll shut down the dispatchers.
+
+    >>> zc.async.testing.shut_down_and_wait(dispatcher)
+    >>> zc.async.testing.shut_down_and_wait(alt_dispatcher)


Property changes on: zc.async/trunk/src/zc/async/catastrophes_revisited.txt
___________________________________________________________________
Name: svn:eol-style
   + native



More information about the Checkins mailing list