[Checkins] SVN: zc.async/trunk/s checkpoint: first "catastrophe"
example with fix.
Gary Poster
gary at zope.com
Fri May 9 17:26:03 EDT 2008
Log message for revision 86589:
checkpoint: first "catastrophe" example with fix.
Changed:
U zc.async/trunk/setup.py
U zc.async/trunk/src/zc/async/CHANGES.txt
U zc.async/trunk/src/zc/async/README.txt
U zc.async/trunk/src/zc/async/TODO.txt
U zc.async/trunk/src/zc/async/agent.py
A zc.async/trunk/src/zc/async/catastrophes.txt
U zc.async/trunk/src/zc/async/dispatcher.py
U zc.async/trunk/src/zc/async/job.py
U zc.async/trunk/src/zc/async/tests.py
U zc.async/trunk/src/zc/async/utils.py
-=-
Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py 2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/setup.py 2008-05-09 21:26:03 UTC (rev 86589)
@@ -72,7 +72,7 @@
setup(
name='zc.async',
- version='1.1',
+ version='1.2',
packages=find_packages('src'),
package_dir={'':'src'},
zip_safe=False,
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt 2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/CHANGES.txt 2008-05-09 21:26:03 UTC (rev 86589)
@@ -6,7 +6,10 @@
- converted all reports from the dispatcher, including the monitor output,
to use "unpacked" integer oids. This addresses a problem that simplejson
was having in trying to interpret the packed string blobs as unicode, and
- then making zc.ngi fall over.
+ then making zc.ngi fall over. To get the object, then, you'll need to
+ use ``ZODB.utils.p64``, like this:
+ ``connection.get(ZODB.utils.p64(INTEGER_OID))``, where ``INTEGER_OID``
+ indicates the integer oid of the object you want to examine.
- added several more tests for the monitor code.
@@ -18,6 +21,15 @@
Fixed, which also means we need a new version of zope.bforest (1.2) for a new
feature there.
+- made the log for finding an activated agent report the pertinent queue's oid
+ as an unpacked integer, rather than the packed string blob. As mentioned
+ above, use ``ZODB.utils.p64`` to convert back to an oid that the ZODB will
+ recognize.
+
+- Bugfix: in failing a job, the job thought it was in agent, and the ``fail``
+ call failed. This is not tested by the first example in new doctest
+ ``catastrophes.txt``.
+
1.1 (2008-04-24)
================
Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt 2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/README.txt 2008-05-09 21:26:03 UTC (rev 86589)
@@ -784,13 +784,14 @@
>>> job = queue.put(main_job)
>>> transaction.commit()
- >>> reactor.wait_for(job, attempts=3)
- TIME OUT
- >>> reactor.wait_for(job, attempts=3)
- TIME OUT
- >>> reactor.wait_for(job, attempts=3)
- TIME OUT
- >>> reactor.wait_for(job, attempts=3)
+ >>> for i in range(10):
+ ... reactor.wait_for(job, attempts=3)
+ ... if job.status == zc.async.interfaces.COMPLETED:
+ ... break
+ ... else:
+ ... assert False, 'never completed'
+ ... # doctest: +ELLIPSIS
+ TIME OUT...
>>> job.result
42
@@ -1212,7 +1213,7 @@
'shortest successful': (..., 'unnamed'),
'started': 22,
'statistics end': datetime.datetime(2006, 8, 10, 15, 46, 52, 211),
- 'statistics start': datetime.datetime(2006, 8, 10, 15, 57, 47, 211),
+ 'statistics start': datetime.datetime(2006, 8, 10, 15, ...),
'successful': 20,
'unknown': 0}
>>> reactor.stop()
Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt 2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/TODO.txt 2008-05-09 21:26:03 UTC (rev 86589)
@@ -1,168 +1,11 @@
Bugs and improvements:
-- in failing a task, jobs are not in agent:
- <zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``> failed with traceback:
- Failure: zc.async.interfaces.AbortedError:
-
- <zc.async.job.Job (oid 33532777, db '') ``zc.async.job.Job (oid 33530083, db '') :fail()``> failed with traceback:
- *--- Failure #7 (pickled) ---
- /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py:299: _call_with_retry(...)
- [ Locals ]
- res : 'None'
- self : "<zc.async.job.Job (oid 33532777, db '') ``zc.async.job.Job (oid 33530083, db '') :fail()``>"
- tm : '<transaction._manager.ThreadTransactionManager object at 0xb7a5ea4c>'
- call : '<function <lambda> at 0xb686fd4c>'
- ct : '0'
- ( Globals )
- success_or_failure : '<function success_or_failure at 0xb6cb917c>'
- __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.pyc'"
- persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
- zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
- pytz : "<module 'pytz' from '/opt/z4m/eggs/pytz-2008a-py2.4.egg/pytz/__init__.py'>"
- __name__ : "'zc.async.job'"
- datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
- types : "<module 'types' from '/opt/cleanpython24/lib/python2.4/types.pyc'>"
- transaction : "<module 'transaction' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/__init__.pyc'>"
- completeStartedJobArguments : '<function completeStartedJobArguments at 0xb6cb91b4>'
- twisted : "<module 'twisted' from '/opt/z4m/eggs/zope.app.twisted-3.4.0-py2.4.egg/twisted/__init__.pyc'>"
- _repr : '<function _repr at 0xb6cb9144>'
- Job : "<class 'zc.async.job.Job'>"
- ZODB : "<module 'ZODB' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/__init__.pyc'>"
- rwproperty : "<module 'rwproperty' from '/opt/z4m/eggs/rwproperty-1.0-py2.4.egg/rwproperty.pyc'>"
- zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
- BTrees : "<module 'BTrees' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/BTrees/__init__.pyc'>"
- __doc__ : 'None'
- /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py:291: <lambda>(...)
- [ Locals ]
- self : "<zc.async.job.Job (oid 33532777, db '') ``zc.async.job.Job (oid 33530083, db '') :fail()``>"
- effective_args : '[]'
- effective_kwargs : '{}'
- ( Globals )
- success_or_failure : '<function success_or_failure at 0xb6cb917c>'
- __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.pyc'"
- persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
- zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
- pytz : "<module 'pytz' from '/opt/z4m/eggs/pytz-2008a-py2.4.egg/pytz/__init__.py'>"
- __name__ : "'zc.async.job'"
- datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
- types : "<module 'types' from '/opt/cleanpython24/lib/python2.4/types.pyc'>"
- transaction : "<module 'transaction' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/__init__.pyc'>"
- completeStartedJobArguments : '<function completeStartedJobArguments at 0xb6cb91b4>'
- twisted : "<module 'twisted' from '/opt/z4m/eggs/zope.app.twisted-3.4.0-py2.4.egg/twisted/__init__.pyc'>"
- _repr : '<function _repr at 0xb6cb9144>'
- Job : "<class 'zc.async.job.Job'>"
- ZODB : "<module 'ZODB' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/__init__.pyc'>"
- rwproperty : "<module 'rwproperty' from '/opt/z4m/eggs/rwproperty-1.0-py2.4.egg/rwproperty.pyc'>"
- zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
- BTrees : "<module 'BTrees' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/BTrees/__init__.pyc'>"
- __doc__ : 'None'
- /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py:362: fail(...)
- [ Locals ]
- self : "<zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``>"
- e : '<zc.async.interfaces.AbortedError instance at 0xb07fa7ac>'
- ( Globals )
- success_or_failure : '<function success_or_failure at 0xb6cb917c>'
- __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.pyc'"
- persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
- zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
- pytz : "<module 'pytz' from '/opt/z4m/eggs/pytz-2008a-py2.4.egg/pytz/__init__.py'>"
- __name__ : "'zc.async.job'"
- datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
- types : "<module 'types' from '/opt/cleanpython24/lib/python2.4/types.pyc'>"
- transaction : "<module 'transaction' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/__init__.pyc'>"
- completeStartedJobArguments : '<function completeStartedJobArguments at 0xb6cb91b4>'
- twisted : "<module 'twisted' from '/opt/z4m/eggs/zope.app.twisted-3.4.0-py2.4.egg/twisted/__init__.pyc'>"
- _repr : '<function _repr at 0xb6cb9144>'
- Job : "<class 'zc.async.job.Job'>"
- ZODB : "<module 'ZODB' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/__init__.pyc'>"
- rwproperty : "<module 'rwproperty' from '/opt/z4m/eggs/rwproperty-1.0-py2.4.egg/rwproperty.pyc'>"
- zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
- BTrees : "<module 'BTrees' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/BTrees/__init__.pyc'>"
- __doc__ : 'None'
- /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py:396: resumeCallbacks(...)
- [ Locals ]
- callbacks : '[]'
- length : '1'
- tm : '<transaction._manager.ThreadTransactionManager object at 0xb7a5ea4c>'
- self : "<zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``>"
- j : "<zc.async.job.Job (oid 33530088, db '') ``zc.z4m.content.query.catalog.addProcessingTask(zc.async.queue.Queue (oid 33148381, db ''), delay=datetime.timedelta(0, 600))``>"
- ( Globals )
- success_or_failure : '<function success_or_failure at 0xb6cb917c>'
- __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.pyc'"
- persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
- zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
- pytz : "<module 'pytz' from '/opt/z4m/eggs/pytz-2008a-py2.4.egg/pytz/__init__.py'>"
- __name__ : "'zc.async.job'"
- datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
- types : "<module 'types' from '/opt/cleanpython24/lib/python2.4/types.pyc'>"
- transaction : "<module 'transaction' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/__init__.pyc'>"
- completeStartedJobArguments : '<function completeStartedJobArguments at 0xb6cb91b4>'
- twisted : "<module 'twisted' from '/opt/z4m/eggs/zope.app.twisted-3.4.0-py2.4.egg/twisted/__init__.pyc'>"
- _repr : '<function _repr at 0xb6cb9144>'
- Job : "<class 'zc.async.job.Job'>"
- ZODB : "<module 'ZODB' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/__init__.pyc'>"
- rwproperty : "<module 'rwproperty' from '/opt/z4m/eggs/rwproperty-1.0-py2.4.egg/rwproperty.pyc'>"
- zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
- BTrees : "<module 'BTrees' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/BTrees/__init__.pyc'>"
- __doc__ : 'None'
- /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.py:77: jobCompleted(...)
- [ Locals ]
- job : "<zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``>"
- self : '<zc.async.agent.Agent object at 0xb07e0d2c>'
- ( Globals )
- __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.pyc'"
- persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
- Agent : "<class 'zc.async.agent.Agent'>"
- datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
- chooseFirst : '<function chooseFirst at 0xb6700d4c>'
- addMainAgentActivationHandler : '<function addMainAgentActivationHandler at 0xb6700dbc>'
- zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
- __name__ : "'zc.async.agent'"
- zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
- __doc__ : 'None'
- /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.py:61: remove(...)
- [ Locals ]
- item : "<zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``>"
- self : '<zc.async.agent.Agent object at 0xb07e0d2c>'
- ( Globals )
- __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.pyc'"
- persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
- Agent : "<class 'zc.async.agent.Agent'>"
- datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
- chooseFirst : '<function chooseFirst at 0xb6700d4c>'
- addMainAgentActivationHandler : '<function addMainAgentActivationHandler at 0xb6700dbc>'
- zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
- __name__ : "'zc.async.agent'"
- zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
- __doc__ : 'None'
- /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.py:58: index(...)
- [ Locals ]
- i : "<zc.async.job.Job (oid 33532777, db '') ``zc.async.job.Job (oid 33530083, db '') :fail()``>"
- ix : '0'
- self : '<zc.async.agent.Agent object at 0xb07e0d2c>'
- item : "<zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``>"
- ( Globals )
- __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.pyc'"
- persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
- Agent : "<class 'zc.async.agent.Agent'>"
- datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
- chooseFirst : '<function chooseFirst at 0xb6700d4c>'
- addMainAgentActivationHandler : '<function addMainAgentActivationHandler at 0xb6700dbc>'
- zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
- __name__ : "'zc.async.agent'"
- zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
- __doc__ : 'None'
- exceptions.ValueError: <zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``> not in Agent
- *--- End of Failure #7 ---
-
-- try to make this look less frightening:
-
- 2008-05-08T13:05:02 ERROR zc.async.events UUID ecbad1cc-1a89-11dd-8f17-0015c5e8367a already activated in queue (oid ???): another process? To stop poll attempts in this process, set ``zc.async.dispatcher.get().activated = False``. To stop polls permanently, don't start a zc.async.dispatcher!
-
- need retry tasks, particularly for callbacks
- need CRITICAL logs for callbacks
+- when database went away
+
For future versions:
- Write the z3monitor tests.
Modified: zc.async/trunk/src/zc/async/agent.py
===================================================================
--- zc.async/trunk/src/zc/async/agent.py 2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/agent.py 2008-05-09 21:26:03 UTC (rev 86589)
@@ -48,7 +48,7 @@
if self.parent is not None:
return self.parent.parent
- for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__', 'pull'):
+ for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__'):
locals()[nm] = zc.async.utils.simpleWrapper(nm)
def index(self, item):
@@ -58,11 +58,16 @@
raise ValueError("%r not in %s" % (item, self.__class__.__name__))
def remove(self, item):
- del self[self.index(item)]
+ self.pull(self.index(item))
def __delitem__(self, ix):
- self._data.pull(ix)
+ self.pull(ix)
+ def pull(self, index=0):
+ res = self._data.pull(index)
+ res.parent = None
+ return res
+
def claimJob(self):
if len(self._data) < self.size:
res = self.chooser(self)
Added: zc.async/trunk/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes.txt (rev 0)
+++ zc.async/trunk/src/zc/async/catastrophes.txt 2008-05-09 21:26:03 UTC (rev 86589)
@@ -0,0 +1,205 @@
+Catastrophes
+============
+
+Sometimes bad things happen in the course of processing tasks. You might have
+a MemoryError while processing your main job, or some other failure might
+happen. That's bad enough. Of course, you can register some callbacks to
+handle the error, to do what you need to recover.
+
+But then what if the callback itself fails? Perhaps the situation that caused
+the main job to fail with a MemoryError will let the callback start, but not
+complete. Then when a sibling dispatcher handles the incomplete job, the
+callback will fail.
+
+You, the user, do have some responsibilities. Callbacks should be very fast and
+light. If you want to do something that takes a long time, or might take a long
+time, have your callback put a new job in a queue for the long job. The
+callback itself should then complete, quickly out of the way.
+
+However, zc.async also has important responsibilities.
+
+This document examines catastrophes like the ones outlined above, to show how
+zc.async handles them, and how you can configure zc.async for these situations.
+Other documents in zc.async, such as the "Dead Dispatchers" section of
+queue.txt, look at this with some isolation and stubs; this document uses a
+complete zc.async set up to examine the system holistically [#setUp]_.
+
+These are the scenarios we'll contemplate:
+
+- The system has a single dispatcher. The dispatcher is working on a job with a
+ callback. The dispatcher dies, and then restarts, cleaning up.
+
+- The system has two dispatchers. One dispatcher is working on a job with a
+ callback, and then dies. The other dispatcher cleans up.
+
+- The system has a single dispatcher. The dispatcher is working on a job, and
+ successfully completes it. The callback begins, and then fails.
+
+- The system has a single dispatcher. The dispatcher is working on a job, and
+ successfully completes it. The callback begins, and then the dispatcher
+ dies.
+
+- The system has a single dispatcher. The database goes away, and then comes
+ back.
+
+-------------------------------------------------
+Dispatcher Dies Gracefully While Performing a Job
+-------------------------------------------------
+
+First let's consider how a failed job with a callback or two is handled when
+the dispatcher dies.
+
+Here we start a job.
+
+ >>> import zope.component
+ >>> import threading
+ >>> import transaction
+ >>> import zc.async.interfaces
+ >>> import zc.async.testing
+ >>> import zc.async.dispatcher
+
+ >>> queue = root[zc.async.interfaces.KEY]['']
+ >>> lock = threading.Lock()
+ >>> lock.acquire()
+ True
+ >>> def wait_for_me():
+ ... lock.acquire()
+ ... lock.release() # so we can use the same lock again later
+ ... raise SystemExit() # this will cause the worker thread to exit
+ ...
+ >>> def handle_error(result):
+ ... return '...I handled the error...'
+ ...
+ >>> job = queue.put(wait_for_me)
+ >>> callback_job = job.addCallbacks(failure=handle_error)
+ >>> transaction.commit()
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> poll = zc.async.testing.get_poll(dispatcher)
+ >>> wait_for_start(job)
+
+In this scenario, ``wait_for_me`` is a job that will "unexpectedly" be lost
+while the dispatcher stops working. ``handle_error`` is the hypothetical
+handler that should be called if the ``wait_for_me`` job fails.
+
+The job has started. Now, the dispatcher suddenly dies without the thread
+performing ``wait_for_me`` getting a chance to finish. For our first example,
+let's give the dispatcher a graceful exit. The dispatcher gets a chance to
+clean up its dispatcher agents, and job.fail() goes into the queue.
+
+ >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+ >>> wait_to_deactivate(dispatcher)
+ >>> _ = transaction.begin()
+ >>> job.status == zc.async.interfaces.ACTIVE
+ True
+ >>> len(queue)
+ 1
+ >>> fail_job = queue[0]
+ >>> fail_job
+ <zc.async.job.Job (oid 51, db 'unnamed') ``zc.async.job.Job (oid 30, db 'unnamed') :fail()``>
+ >>> queue[0].callable
+ <bound method Job.fail of <zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.wait_for_me()``>>
+
+Now when the process starts back up again, our callback will be performed.
+
+ >>> old_dispatcher = dispatcher
+ >>> zc.async.dispatcher.clear()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> zc.async.testing.wait_for_result(fail_job)
+ >>> job.status == zc.async.interfaces.COMPLETED
+ True
+ >>> job.result
+ <zc.twist.Failure zc.async.interfaces.AbortedError>
+ >>> callback_job.status == zc.async.interfaces.COMPLETED
+ True
+ >>> callback_job.result
+ '...I handled the error...'
+
+So, our callback had a chance to do whatever it thought appropriate--in this
+case, simply returning a string--once the dispatcher got back online
+[#cleanup1]_.
+
+--------------------------------------------------------------
+Dispatcher Dies "Hard" While Performing a Job, Sibling Resumes
+--------------------------------------------------------------
+
+--------------
+Callback Fails
+--------------
+
+-------------------------------
+Dispatcher Dies During Callback
+-------------------------------
+
+------------------------------
+Database Disappears For Awhile
+------------------------------
+
+---------------------------------------------
+Other Catastrophes, And Your Responsibilities
+---------------------------------------------
+
+There are some catastrophes from which there are no easy fixes like these. For
+instance, imagine you have communicated with an external system, and gotten a
+reply that you have successfully made a transaction there, but then the
+dispatcher dies, or the database disappears, before you have a chance to redo.
+
+[#last_cleanup]_
+
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
+
+.. [#setUp]
+
+ >>> import ZODB.FileStorage
+ >>> storage = ZODB.FileStorage.FileStorage(
+ ... 'main.fs', create=True)
+ >>> from ZODB.DB import DB
+ >>> db = DB(storage)
+ >>> conn = db.open()
+ >>> root = conn.root()
+ >>> import zc.async.configure
+ >>> zc.async.configure.base()
+ >>> import zc.async.subscribers
+ >>> import zope.component
+ >>> zope.component.provideHandler(zc.async.subscribers.queue_installer)
+ >>> zope.component.provideHandler(
+ ... zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.5))
+ >>> zope.component.provideHandler(zc.async.subscribers.agent_installer)
+ >>> import zope.event
+ >>> import zc.async.interfaces
+ >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
+ >>> import transaction
+ >>> _ = transaction.begin()
+
+ >>> import time
+ >>> def wait_for_start(job):
+ ... for i in range(60):
+ ... t = transaction.begin()
+ ... if job.status == zc.async.interfaces.ACTIVE:
+ ... break
+ ... time.sleep(0.1)
+ ... else:
+ ... assert False, 'job never started'
+
+ >>> def wait_to_deactivate(dispatcher):
+ ... for i in range(60):
+ ... if dispatcher.activated == False:
+ ... break
+ ... time.sleep(0.1)
+ ... else:
+ ... assert False, 'dispatcher never deactivated'
+
+.. [#cleanup1]
+
+ >>> lock.release()
+ >>> old_dispatcher.thread.join(3)
+ >>> old_dispatcher.dead_pools[0].threads[0].join(3)
+
+.. [#last_cleanup]
+
+ >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+ >>> dispatcher.thread.join(3)
Property changes on: zc.async/trunk/src/zc/async/catastrophes.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py 2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/dispatcher.py 2008-05-09 21:26:03 UTC (rev 86589)
@@ -124,7 +124,7 @@
self.dispatcher = dispatcher
self.name = name
self.queue = Queue.Queue(0)
- self._threads = []
+ self.threads = []
self.setSize(size)
def getSize(self):
@@ -216,16 +216,16 @@
self._size = size
res = []
ct = 0
- for t in self._threads:
+ for t in self.threads:
if t.isAlive():
res.append(t)
ct += 1
- self._threads[:] = res
+ self.threads[:] = res
if ct < size:
for i in range(max(size - ct, 0)):
t = threading.Thread(target=self.perform_thread)
t.setDaemon(True)
- self._threads.append(t)
+ self.threads.append(t)
t.start()
elif ct > size:
# this may cause some bouncing, but hopefully nothing too bad.
@@ -283,8 +283,10 @@
# timeout period when they are begun, so we give a bit of cushion.
self.polls = zc.async.utils.Periodic(
period=datetime.timedelta(minutes=10), buckets=5) # max of 12.5 min
+ self.polls.__parent__ = self
self.jobs = zope.bforest.periodic.OOBForest(
period=datetime.timedelta(minutes=20), count=9) # max of 22.5 min
+ self.jobs.__parent__ = self
self._activated = set()
self.queues = {}
self.dead_pools = []
@@ -362,12 +364,13 @@
else:
zc.async.utils.log.error(
'UUID %s already activated in queue %s '
- '(oid %s): another process? To stop '
+ '(oid %d): another process? (To stop '
'poll attempts in this process, set '
'``zc.async.dispatcher.get().activated = '
"False``. To stop polls permanently, don't "
- 'start a zc.async.dispatcher!',
- self.UUID, queue.name, queue._p_oid)
+ 'start a zc.async.dispatcher!)',
+ self.UUID, queue.name,
+ ZODB.utils.u64(queue._p_oid))
continue
da.activate()
self._activated.add(queue._p_oid)
@@ -455,10 +458,10 @@
self.dead_pools.append(pools.pop(name))
if conn_delta:
db = queues._p_jar.db()
- # this is a bit premature--it should really happen
- # when all threads are complete--but since the pool just
- # complains if the size is not honored, and this approach
- # is easier, we're doing this.
+ # this is a bit premature--it should really happen when
+ # all threads are complete--but since the pool just
+ # complains if the size is not honored, and this
+ # approach is easier, we're doing this.
db.setPoolSize(db.getPoolSize() + conn_delta)
if len(self.queues) > len(poll_info):
conn_delta = 0
@@ -532,28 +535,31 @@
def deactivate(self):
if not self.activated:
raise ValueError('not activated')
- self.activated = False
- transaction.begin()
+ self.activated = None # "in progress"
try:
- queues = self.conn.root().get(zc.async.interfaces.KEY)
- if queues is not None:
- for queue in queues.values():
- da = queue.dispatchers.get(self.UUID)
- if da is not None and da.activated:
- da.deactivate()
- self._commit('trying to tear down')
+ transaction.begin()
+ try:
+ queues = self.conn.root().get(zc.async.interfaces.KEY)
+ if queues is not None:
+ for queue in queues.values():
+ da = queue.dispatchers.get(self.UUID)
+ if da is not None and da.activated:
+ da.deactivate()
+ self._commit('trying to tear down')
+ finally:
+ transaction.abort()
+ self.conn.close()
+ conn_delta = 0
+ for queue_pools in self.queues.values():
+ for name, pool in queue_pools.items():
+ conn_delta += pool.setSize(0)
+ self.dead_pools.append(queue_pools.pop(name))
+ conn_delta -= 1
+ self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
+ zc.async.utils.log.info('deactivated dispatcher %s',
+ self.UUID)
finally:
- transaction.abort()
- self.conn.close()
- conn_delta = 0
- for queue_pools in self.queues.values():
- for name, pool in queue_pools.items():
- conn_delta += pool.setSize(0)
- self.dead_pools.append(queue_pools.pop(name))
- conn_delta -= 1
- self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
- zc.async.utils.log.info('deactivated dispatcher %s',
- self.UUID)
+ self.activated = False # "completed" (can distinguish for tests)
# these methods are used for monitoring and analysis
Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py 2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/job.py 2008-05-09 21:26:03 UTC (rev 86589)
@@ -240,6 +240,8 @@
self._callable_name = value.__name__
else:
self._callable_root, self._callable_name = value, None
+ if zc.async.interfaces.IJob.providedBy(self._callable_root):
+ self._callable_root.parent = self
def addCallbacks(self, success=None, failure=None):
if success is not None or failure is not None:
Modified: zc.async/trunk/src/zc/async/tests.py
===================================================================
--- zc.async/trunk/src/zc/async/tests.py 2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/tests.py 2008-05-09 21:26:03 UTC (rev 86589)
@@ -143,6 +143,7 @@
'subscribers.txt',
'README.txt',
'README_2.txt',
+ 'catastrophes.txt',
setUp=modSetUp, tearDown=modTearDown,
optionflags=doctest.INTERPRET_FOOTNOTES),
))
Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py 2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/utils.py 2008-05-09 21:26:03 UTC (rev 86589)
@@ -140,7 +140,8 @@
while key in self._data:
key -= 1
self._data[key] = item
- item.parent = self.__parent__ # the agent
+ assert self.__parent__ is not None
+ item.parent = self.__parent__
item.key = key
def iter(self, start=None, stop=None):
More information about the Checkins
mailing list