[Checkins] SVN: zc.async/trunk/src/zc/async/catastrophes_revisited.txt add missing file from last checkin :-(
Gary Poster
gary at modernsongs.com
Fri Sep 5 20:35:05 EDT 2008
Log message for revision 90912:
add missing file from last checkin :-(
Changed:
A zc.async/trunk/src/zc/async/catastrophes_revisited.txt
-=-
Added: zc.async/trunk/src/zc/async/catastrophes_revisited.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes_revisited.txt (rev 0)
+++ zc.async/trunk/src/zc/async/catastrophes_revisited.txt 2008-09-06 00:35:04 UTC (rev 90912)
@@ -0,0 +1,172 @@
+Catastrophes Revisited
+======================
+
+The examples in :ref:`recovering-from-catastrophes` show how various system
+catastrophes are handled by the async system.
+
+This document discusses a problem that can be caused by the mechanism used to
+solve one of those catastrophes.
+
+One of the last catastrophes discussed in the other document is the
+:ref:`hard-crash-with-sibling-recovery`. A central mechanism for this recovery
+is that polls are recorded to the database as a "ping" after a certain
+duration, selected on a ``DispatcherAgents`` object as a ``ping_interval``.
+The ping's datetime is recorded as the ``last_ping``. If the ``last_ping``
+plus the ``DispatcherAgents``'s ``ping_death_interval`` is less than now,
+the dispatcher is considered ``dead``. Sibling processes then are supposed to
+clean out agents that they recognize as dead.
+
+A serious problem can occur if an agent appears dead, but is not. This can
+happen, for instance, if one or more long-running commits lock out a commit to
+record a poll (database commits are serial). In this case, jobs may be
+removed from an agent, but the agent's dispatcher is still working on them.
+The jobs may even be assigned to another agent, so that two agents are working
+on them simultaneously.
+
+Our first clue of a problem within the old process is at commit. Write
+conflicts will fire the first time the old process tries to commit, because
+the job status changed underneath the process, and the process is trying to
+change the job (either the status or the result).
+
+The old process should not retry. The new process, if any, is responsible for
+this job now.
+
+Let's look at an example of this. First, some setup.
+
+So, first we start a long-running job in the dispatcher as before.
+
+ >>> import ZODB.FileStorage
+ >>> storage = ZODB.FileStorage.FileStorage(
+ ... 'main.fs', create=True)
+ >>> from ZODB.DB import DB
+ >>> db = DB(storage)
+ >>> conn = db.open()
+ >>> root = conn.root()
+ >>> import zc.async.configure
+ >>> zc.async.configure.base()
+ >>> import zc.async.subscribers
+ >>> import zope.component
+ >>> zope.component.provideHandler(zc.async.subscribers.queue_installer)
+ >>> zope.component.provideHandler(
+ ... zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.1))
+ >>> zope.component.provideHandler(zc.async.subscribers.agent_installer)
+ >>> import zope.event
+ >>> import zc.async.interfaces
+ >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
+ >>> import transaction
+ >>> _ = transaction.begin()
+
+ >>> import zope.component
+ >>> import transaction
+ >>> import zc.async.interfaces
+ >>> import zc.async.testing
+ >>> import zc.async.dispatcher
+ >>> import threading
+
+ >>> queue = root[zc.async.interfaces.KEY]['']
+ >>> lock = threading.Lock()
+ >>> lock.acquire()
+ True
+ >>> def wait_for_me():
+ ... lock.acquire()
+ ... return 42
+ ...
+ >>> job = queue.put(wait_for_me)
+ >>> transaction.commit()
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> poll = zc.async.testing.get_poll(dispatcher)
+ >>> zc.async.testing.wait_for_start(job)
+ >>> import ZODB.utils
+ >>> dispatcher.getJobInfo(ZODB.utils.u64(job._p_oid))['reassigned']
+ False
+
+Now we'll set the ping_death_interval to a too-short value (shorter than the
+poll_interval) so that the dispatcher will appear to be dead.
+
+ >>> import zc.async.instanceuuid
+ >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+ >>> import datetime
+ >>> da.ping_death_interval = datetime.timedelta(seconds=0.01)
+ >>> transaction.commit()
+
+Now we'll start up an alternate dispatcher, polling *really* fast.
+
+ >>> import uuid
+ >>> alt_uuid = uuid.uuid1()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.01, uuid=alt_uuid)(
+ ... zc.async.interfaces.DatabaseOpened(db))
+ >>> alt_dispatcher = zc.async.dispatcher.get(alt_uuid)
+ >>> poll = zc.async.testing.get_poll(alt_dispatcher)
+
+Soon, the job should be in both agents.
+
+ >>> ignore = transaction.begin()
+ >>> alt_da = queue.dispatchers[alt_uuid]
+ >>> alt_agent = alt_da['main']
+ >>> from time import sleep as time_sleep
+ >>> for i in range(10):
+ ... ignore = transaction.begin()
+ ... if len(alt_agent):
+ ... break
+ ... time_sleep(0.1)
+ ... else:
+ ... print 'timed out!'
+ ...
+ >>> import ZODB.utils
+ >>> if ZODB.utils.u64(list(alt_agent)[0]._p_oid) == (
+ ... dispatcher.getActiveJobIds()[0][0]):
+ ... print "matches"
+ ... else:
+ ... print ('no match:',
+ ... ZODB.utils.u64(list(alt_agent)[0]._p_oid),
+ ... dispatcher.getActiveJobIds()[0][0])
+ ...
+ matches
+ >>> zc.async.testing.wait_for_start(job)
+
+Note that the dispatcher's getJobInfo method will recognize that the job has
+been reassigned (or at least removed) once it polls.
+
+ >>> poll = zc.async.testing.get_poll(dispatcher)
+ >>> dispatcher.getJobInfo(ZODB.utils.u64(job._p_oid))['reassigned']
+ True
+
+Now we'll let the first thread working the job complete. It will fail,
+emptying the agent and logging the problem.
+
+ >>> lock.release()
+ >>> for i in range(10):
+ ... if not len(dispatcher.getActiveJobIds()):
+ ... break
+ ... time_sleep(0.1)
+ ... else:
+ ... print 'timed out!'
+ ...
+ >>> for r in reversed(event_logs.records):
+ ... if r.levelname == 'ERROR':
+ ... break
+ ... else:
+ ... assert False, 'did not find log'
+ ...
+ >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ <...Job ...> was reassigned. Likely cause was that polling was unable to
+ occur as regularly as expected, perhaps because of long commit times in
+ the application.
+
+ >>> dispatcher.getActiveJobIds()
+ []
+
+Now we'll let the job run to completion in the new dispatcher.
+
+ >>> lock.release()
+ >>> zc.async.testing.wait_for_result(job)
+ 42
+
+Q.E.D.
+
+Now we'll shut down the dispatchers.
+
+ >>> zc.async.testing.shut_down_and_wait(dispatcher)
+ >>> zc.async.testing.shut_down_and_wait(alt_dispatcher)
Property changes on: zc.async/trunk/src/zc/async/catastrophes_revisited.txt
___________________________________________________________________
Name: svn:eol-style
+ native
More information about the Checkins
mailing list