[Checkins] SVN: zc.async/trunk/s checkpoint: first "catastrophe" example with fix.

Gary Poster gary at zope.com
Fri May 9 17:26:03 EDT 2008


Log message for revision 86589:
  checkpoint: first "catastrophe" example with fix.

Changed:
  U   zc.async/trunk/setup.py
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/README.txt
  U   zc.async/trunk/src/zc/async/TODO.txt
  U   zc.async/trunk/src/zc/async/agent.py
  A   zc.async/trunk/src/zc/async/catastrophes.txt
  U   zc.async/trunk/src/zc/async/dispatcher.py
  U   zc.async/trunk/src/zc/async/job.py
  U   zc.async/trunk/src/zc/async/tests.py
  U   zc.async/trunk/src/zc/async/utils.py

-=-
Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py	2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/setup.py	2008-05-09 21:26:03 UTC (rev 86589)
@@ -72,7 +72,7 @@
 
 setup(
     name='zc.async',
-    version='1.1',
+    version='1.2',
     packages=find_packages('src'),
     package_dir={'':'src'},
     zip_safe=False,

Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-05-09 21:26:03 UTC (rev 86589)
@@ -6,7 +6,10 @@
 - converted all reports from the dispatcher, including the monitor output,
   to use "unpacked" integer oids.  This addresses a problem that simplejson
   was having in trying to interpret the packed string blobs as unicode, and
-  then making zc.ngi fall over.
+  then making zc.ngi fall over.  To get the object, then, you'll need to
+  use ``ZODB.utils.p64``, like this:
+  ``connection.get(ZODB.utils.p64(INTEGER_OID))``, where ``INTEGER_OID``
+  indicates the integer oid of the object you want to examine.
 
 - added several more tests for the monitor code.
 
@@ -18,6 +21,15 @@
   Fixed, which also means we need a new version of zope.bforest (1.2) for a new
   feature there.
 
+- made the log for finding an activated agent report the pertinent queue's oid
+  as an unpacked integer, rather than the packed string blob.  As mentioned
+  above, use ``ZODB.utils.p64`` to convert back to an oid that the ZODB will
+  recognize.
+
+- Bugfix: in failing a job, the job thought it was in agent, and the ``fail``
+  call failed.  This is not tested by the first example in new doctest
+  ``catastrophes.txt``.
+
 1.1 (2008-04-24)
 ================
 

Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt	2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/README.txt	2008-05-09 21:26:03 UTC (rev 86589)
@@ -784,13 +784,14 @@
 
     >>> job = queue.put(main_job)
     >>> transaction.commit()
-    >>> reactor.wait_for(job, attempts=3)
-    TIME OUT
-    >>> reactor.wait_for(job, attempts=3)
-    TIME OUT
-    >>> reactor.wait_for(job, attempts=3)
-    TIME OUT
-    >>> reactor.wait_for(job, attempts=3)
+    >>> for i in range(10):
+    ...     reactor.wait_for(job, attempts=3)
+    ...     if job.status == zc.async.interfaces.COMPLETED:
+    ...         break
+    ... else:
+    ...     assert False, 'never completed'
+    ... # doctest: +ELLIPSIS
+    TIME OUT...
     >>> job.result
     42
 
@@ -1212,7 +1213,7 @@
      'shortest successful': (..., 'unnamed'),
      'started': 22,
      'statistics end': datetime.datetime(2006, 8, 10, 15, 46, 52, 211),
-     'statistics start': datetime.datetime(2006, 8, 10, 15, 57, 47, 211),
+     'statistics start': datetime.datetime(2006, 8, 10, 15, ...),
      'successful': 20,
      'unknown': 0}
     >>> reactor.stop()

Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt	2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/TODO.txt	2008-05-09 21:26:03 UTC (rev 86589)
@@ -1,168 +1,11 @@
 Bugs and improvements:
 
-- in failing a task, jobs are not in agent:
-  <zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``> failed with traceback:
-  Failure: zc.async.interfaces.AbortedError: 
-
-  <zc.async.job.Job (oid 33532777, db '') ``zc.async.job.Job (oid 33530083, db '') :fail()``> failed with traceback:
-  *--- Failure #7 (pickled) ---
-  /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py:299: _call_with_retry(...)
-   [ Locals ]
-    res : 'None'
-    self : "<zc.async.job.Job (oid 33532777, db '') ``zc.async.job.Job (oid 33530083, db '') :fail()``>"
-    tm : '<transaction._manager.ThreadTransactionManager object at 0xb7a5ea4c>'
-    call : '<function <lambda> at 0xb686fd4c>'
-    ct : '0'
-   ( Globals )
-    success_or_failure : '<function success_or_failure at 0xb6cb917c>'
-    __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.pyc'"
-    persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
-    zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
-    pytz : "<module 'pytz' from '/opt/z4m/eggs/pytz-2008a-py2.4.egg/pytz/__init__.py'>"
-    __name__ : "'zc.async.job'"
-    datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
-    types : "<module 'types' from '/opt/cleanpython24/lib/python2.4/types.pyc'>"
-    transaction : "<module 'transaction' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/__init__.pyc'>"
-    completeStartedJobArguments : '<function completeStartedJobArguments at 0xb6cb91b4>'
-    twisted : "<module 'twisted' from '/opt/z4m/eggs/zope.app.twisted-3.4.0-py2.4.egg/twisted/__init__.pyc'>"
-    _repr : '<function _repr at 0xb6cb9144>'
-    Job : "<class 'zc.async.job.Job'>"
-    ZODB : "<module 'ZODB' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/__init__.pyc'>"
-    rwproperty : "<module 'rwproperty' from '/opt/z4m/eggs/rwproperty-1.0-py2.4.egg/rwproperty.pyc'>"
-    zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
-    BTrees : "<module 'BTrees' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/BTrees/__init__.pyc'>"
-    __doc__ : 'None'
-  /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py:291: <lambda>(...)
-   [ Locals ]
-    self : "<zc.async.job.Job (oid 33532777, db '') ``zc.async.job.Job (oid 33530083, db '') :fail()``>"
-    effective_args : '[]'
-    effective_kwargs : '{}'
-   ( Globals )
-    success_or_failure : '<function success_or_failure at 0xb6cb917c>'
-    __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.pyc'"
-    persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
-    zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
-    pytz : "<module 'pytz' from '/opt/z4m/eggs/pytz-2008a-py2.4.egg/pytz/__init__.py'>"
-    __name__ : "'zc.async.job'"
-    datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
-    types : "<module 'types' from '/opt/cleanpython24/lib/python2.4/types.pyc'>"
-    transaction : "<module 'transaction' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/__init__.pyc'>"
-    completeStartedJobArguments : '<function completeStartedJobArguments at 0xb6cb91b4>'
-    twisted : "<module 'twisted' from '/opt/z4m/eggs/zope.app.twisted-3.4.0-py2.4.egg/twisted/__init__.pyc'>"
-    _repr : '<function _repr at 0xb6cb9144>'
-    Job : "<class 'zc.async.job.Job'>"
-    ZODB : "<module 'ZODB' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/__init__.pyc'>"
-    rwproperty : "<module 'rwproperty' from '/opt/z4m/eggs/rwproperty-1.0-py2.4.egg/rwproperty.pyc'>"
-    zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
-    BTrees : "<module 'BTrees' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/BTrees/__init__.pyc'>"
-    __doc__ : 'None'
-  /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py:362: fail(...)
-   [ Locals ]
-    self : "<zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``>"
-    e : '<zc.async.interfaces.AbortedError instance at 0xb07fa7ac>'
-   ( Globals )
-    success_or_failure : '<function success_or_failure at 0xb6cb917c>'
-    __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.pyc'"
-    persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
-    zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
-    pytz : "<module 'pytz' from '/opt/z4m/eggs/pytz-2008a-py2.4.egg/pytz/__init__.py'>"
-    __name__ : "'zc.async.job'"
-    datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
-    types : "<module 'types' from '/opt/cleanpython24/lib/python2.4/types.pyc'>"
-    transaction : "<module 'transaction' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/__init__.pyc'>"
-    completeStartedJobArguments : '<function completeStartedJobArguments at 0xb6cb91b4>'
-    twisted : "<module 'twisted' from '/opt/z4m/eggs/zope.app.twisted-3.4.0-py2.4.egg/twisted/__init__.pyc'>"
-    _repr : '<function _repr at 0xb6cb9144>'
-    Job : "<class 'zc.async.job.Job'>"
-    ZODB : "<module 'ZODB' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/__init__.pyc'>"
-    rwproperty : "<module 'rwproperty' from '/opt/z4m/eggs/rwproperty-1.0-py2.4.egg/rwproperty.pyc'>"
-    zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
-    BTrees : "<module 'BTrees' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/BTrees/__init__.pyc'>"
-    __doc__ : 'None'
-  /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.py:396: resumeCallbacks(...)
-   [ Locals ]
-    callbacks : '[]'
-    length : '1'
-    tm : '<transaction._manager.ThreadTransactionManager object at 0xb7a5ea4c>'
-    self : "<zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``>"
-    j : "<zc.async.job.Job (oid 33530088, db '') ``zc.z4m.content.query.catalog.addProcessingTask(zc.async.queue.Queue (oid 33148381, db ''), delay=datetime.timedelta(0, 600))``>"
-   ( Globals )
-    success_or_failure : '<function success_or_failure at 0xb6cb917c>'
-    __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/job.pyc'"
-    persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
-    zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
-    pytz : "<module 'pytz' from '/opt/z4m/eggs/pytz-2008a-py2.4.egg/pytz/__init__.py'>"
-    __name__ : "'zc.async.job'"
-    datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
-    types : "<module 'types' from '/opt/cleanpython24/lib/python2.4/types.pyc'>"
-    transaction : "<module 'transaction' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/transaction/__init__.pyc'>"
-    completeStartedJobArguments : '<function completeStartedJobArguments at 0xb6cb91b4>'
-    twisted : "<module 'twisted' from '/opt/z4m/eggs/zope.app.twisted-3.4.0-py2.4.egg/twisted/__init__.pyc'>"
-    _repr : '<function _repr at 0xb6cb9144>'
-    Job : "<class 'zc.async.job.Job'>"
-    ZODB : "<module 'ZODB' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/ZODB/__init__.pyc'>"
-    rwproperty : "<module 'rwproperty' from '/opt/z4m/eggs/rwproperty-1.0-py2.4.egg/rwproperty.pyc'>"
-    zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
-    BTrees : "<module 'BTrees' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/BTrees/__init__.pyc'>"
-    __doc__ : 'None'
-  /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.py:77: jobCompleted(...)
-   [ Locals ]
-    job : "<zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``>"
-    self : '<zc.async.agent.Agent object at 0xb07e0d2c>'
-   ( Globals )
-    __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.pyc'"
-    persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
-    Agent : "<class 'zc.async.agent.Agent'>"
-    datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
-    chooseFirst : '<function chooseFirst at 0xb6700d4c>'
-    addMainAgentActivationHandler : '<function addMainAgentActivationHandler at 0xb6700dbc>'
-    zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
-    __name__ : "'zc.async.agent'"
-    zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
-    __doc__ : 'None'
-  /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.py:61: remove(...)
-   [ Locals ]
-    item : "<zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``>"
-    self : '<zc.async.agent.Agent object at 0xb07e0d2c>'
-   ( Globals )
-    __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.pyc'"
-    persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
-    Agent : "<class 'zc.async.agent.Agent'>"
-    datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
-    chooseFirst : '<function chooseFirst at 0xb6700d4c>'
-    addMainAgentActivationHandler : '<function addMainAgentActivationHandler at 0xb6700dbc>'
-    zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
-    __name__ : "'zc.async.agent'"
-    zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
-    __doc__ : 'None'
-  /opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.py:58: index(...)
-   [ Locals ]
-    i : "<zc.async.job.Job (oid 33532777, db '') ``zc.async.job.Job (oid 33530083, db '') :fail()``>"
-    ix : '0'
-    self : '<zc.async.agent.Agent object at 0xb07e0d2c>'
-    item : "<zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``>"
-   ( Globals )
-    __file__ : "'/opt/z4m/eggs/zc.async-1.1-py2.4.egg/zc/async/agent.pyc'"
-    persistent : "<module 'persistent' from '/opt/z4m/eggs/ZODB3-3.8.0-py2.4-linux-i686.egg/persistent/__init__.pyc'>"
-    Agent : "<class 'zc.async.agent.Agent'>"
-    datetime : "<module 'datetime' from '/opt/cleanpython24/lib/python2.4/lib-dynload/datetime.so'>"
-    chooseFirst : '<function chooseFirst at 0xb6700d4c>'
-    addMainAgentActivationHandler : '<function addMainAgentActivationHandler at 0xb6700dbc>'
-    zope : "<module 'zope' from '/opt/z4m/eggs/zope.app.xmlrpcintrospection-3.4.0a1-py2.4.egg/zope/__init__.pyc'>"
-    __name__ : "'zc.async.agent'"
-    zc : "<module 'zc' from '/opt/z4m/eggs/zc.buildout-1.0.0-py2.4.egg/zc/__init__.pyc'>"
-    __doc__ : 'None'
-  exceptions.ValueError: <zc.async.job.Job (oid 33530083, db '') ``zc.z4m.content.query.catalog.processCatalogQueues()``> not in Agent
-  *--- End of Failure #7 ---
-
-- try to make this look less frightening:
-
-    2008-05-08T13:05:02 ERROR zc.async.events UUID ecbad1cc-1a89-11dd-8f17-0015c5e8367a already activated in queue  (oid ???): another process?  To stop poll attempts in this process, set ``zc.async.dispatcher.get().activated = False``.  To stop polls permanently, don't start a zc.async.dispatcher!
-
 - need retry tasks, particularly for callbacks
 
 - need CRITICAL logs for callbacks
 
+- when database went away
+
 For future versions:
 
 - Write the z3monitor tests.

Modified: zc.async/trunk/src/zc/async/agent.py
===================================================================
--- zc.async/trunk/src/zc/async/agent.py	2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/agent.py	2008-05-09 21:26:03 UTC (rev 86589)
@@ -48,7 +48,7 @@
         if self.parent is not None:
             return self.parent.parent
 
-    for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__', 'pull'):
+    for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__'):
         locals()[nm] = zc.async.utils.simpleWrapper(nm)
 
     def index(self, item):
@@ -58,11 +58,16 @@
         raise ValueError("%r not in %s" % (item, self.__class__.__name__))
 
     def remove(self, item):
-        del self[self.index(item)]
+        self.pull(self.index(item))
 
     def __delitem__(self, ix):
-        self._data.pull(ix)
+        self.pull(ix)
 
+    def pull(self, index=0):
+        res = self._data.pull(index)
+        res.parent = None
+        return res
+
     def claimJob(self):
         if len(self._data) < self.size:
             res = self.chooser(self)

Added: zc.async/trunk/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes.txt	                        (rev 0)
+++ zc.async/trunk/src/zc/async/catastrophes.txt	2008-05-09 21:26:03 UTC (rev 86589)
@@ -0,0 +1,205 @@
+Catastrophes
+============
+
+Sometimes bad things happen in the course of processing tasks.  You might have
+a MemoryError while processing your main job, or some other failure might
+happen.  That's bad enough.  Of course, you can register some callbacks to
+handle the error, to do what you need to recover.
+
+But then what if the callback itself fails? Perhaps the situation that caused
+the main job to fail with a MemoryError will let the callback start, but not
+complete.  Then when a sibling dispatcher handles the incomplete job, the
+callback will fail.
+
+You, the user, do have some responsibilities. Callbacks should be very fast and
+light. If you want to do something that takes a long time, or might take a long
+time, have your callback put a new job in a queue for the long job. The
+callback itself should then complete, quickly out of the way. 
+
+However, zc.async also has important responsibilities.
+
+This document examines catastrophes like the ones outlined above, to show how
+zc.async handles them, and how you can configure zc.async for these situations.
+Other documents in zc.async, such as the "Dead Dispatchers" section of
+queue.txt, look at this with some isolation and stubs; this document uses a
+complete zc.async set up to examine the system holistically [#setUp]_.
+
+These are the scenarios we'll contemplate:
+
+- The system has a single dispatcher. The dispatcher is working on a job with a
+  callback. The dispatcher dies, and then restarts, cleaning up.
+
+- The system has two dispatchers. One dispatcher is working on a job with a
+  callback, and then dies. The other dispatcher cleans up.
+
+- The system has a single dispatcher.  The dispatcher is working on a job, and
+  successfully completes it.  The callback begins, and then fails.
+
+- The system has a single dispatcher.  The dispatcher is working on a job, and
+  successfully completes it.  The callback begins, and then the dispatcher
+  dies.
+
+- The system has a single dispatcher.  The database goes away, and then comes
+  back.
+
+-------------------------------------------------
+Dispatcher Dies Gracefully While Performing a Job
+-------------------------------------------------
+
+First let's consider how a failed job with a callback or two is handled when
+the dispatcher dies.
+
+Here we start a job.
+
+    >>> import zope.component
+    >>> import threading
+    >>> import transaction
+    >>> import zc.async.interfaces
+    >>> import zc.async.testing
+    >>> import zc.async.dispatcher
+
+    >>> queue = root[zc.async.interfaces.KEY]['']
+    >>> lock = threading.Lock()
+    >>> lock.acquire()
+    True
+    >>> def wait_for_me():
+    ...     lock.acquire()
+    ...     lock.release() # so we can use the same lock again later
+    ...     raise SystemExit() # this will cause the worker thread to exit
+    ...
+    >>> def handle_error(result):
+    ...     return '...I handled the error...'
+    ...
+    >>> job = queue.put(wait_for_me)
+    >>> callback_job = job.addCallbacks(failure=handle_error)
+    >>> transaction.commit()
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> wait_for_start(job)
+
+In this scenario, ``wait_for_me`` is a job that will "unexpectedly" be lost
+while the dispatcher stops working.  ``handle_error`` is the hypothetical
+handler that should be called if the ``wait_for_me`` job fails.
+
+The job has started. Now, the dispatcher suddenly dies without the thread
+performing ``wait_for_me`` getting a chance to finish. For our first example,
+let's give the dispatcher a graceful exit. The dispatcher gets a chance to
+clean up its dispatcher agents, and job.fail() goes into the queue.
+
+    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+    >>> wait_to_deactivate(dispatcher)
+    >>> _ = transaction.begin()
+    >>> job.status == zc.async.interfaces.ACTIVE
+    True
+    >>> len(queue)
+    1
+    >>> fail_job = queue[0]
+    >>> fail_job
+    <zc.async.job.Job (oid 51, db 'unnamed') ``zc.async.job.Job (oid 30, db 'unnamed') :fail()``>
+    >>> queue[0].callable
+    <bound method Job.fail of <zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.wait_for_me()``>>
+
+Now when the process starts back up again, our callback will be performed.
+
+    >>> old_dispatcher = dispatcher
+    >>> zc.async.dispatcher.clear()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...         poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> zc.async.testing.wait_for_result(fail_job)
+    >>> job.status == zc.async.interfaces.COMPLETED
+    True
+    >>> job.result
+    <zc.twist.Failure zc.async.interfaces.AbortedError>
+    >>> callback_job.status == zc.async.interfaces.COMPLETED
+    True
+    >>> callback_job.result
+    '...I handled the error...'
+
+So, our callback had a chance to do whatever it thought appropriate--in this
+case, simply returning a string--once the dispatcher got back online
+[#cleanup1]_.
+
+--------------------------------------------------------------
+Dispatcher Dies "Hard" While Performing a Job, Sibling Resumes
+--------------------------------------------------------------
+
+--------------
+Callback Fails
+--------------
+
+-------------------------------
+Dispatcher Dies During Callback
+-------------------------------
+
+------------------------------
+Database Disappears For Awhile
+------------------------------
+
+---------------------------------------------
+Other Catastrophes, And Your Responsibilities
+---------------------------------------------
+
+There are some catastrophes from which there are no easy fixes like these.  For
+instance, imagine you have communicated with an external system, and gotten a
+reply that you have successfully made a transaction there, but then the
+dispatcher dies, or the database disappears, before you have a chance to redo.
+
+[#last_cleanup]_
+
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
+
+.. [#setUp]
+
+    >>> import ZODB.FileStorage
+    >>> storage = ZODB.FileStorage.FileStorage(
+    ...     'main.fs', create=True)
+    >>> from ZODB.DB import DB 
+    >>> db = DB(storage) 
+    >>> conn = db.open()
+    >>> root = conn.root()
+    >>> import zc.async.configure
+    >>> zc.async.configure.base()
+    >>> import zc.async.subscribers
+    >>> import zope.component
+    >>> zope.component.provideHandler(zc.async.subscribers.queue_installer)
+    >>> zope.component.provideHandler(
+    ...     zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...         poll_interval=0.5))
+    >>> zope.component.provideHandler(zc.async.subscribers.agent_installer)
+    >>> import zope.event
+    >>> import zc.async.interfaces
+    >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
+    >>> import transaction
+    >>> _ = transaction.begin()
+
+    >>> import time
+    >>> def wait_for_start(job):
+    ...     for i in range(60):
+    ...         t = transaction.begin()
+    ...         if job.status == zc.async.interfaces.ACTIVE:
+    ...             break
+    ...         time.sleep(0.1)
+    ...     else:
+    ...         assert False, 'job never started'
+
+    >>> def wait_to_deactivate(dispatcher):
+    ...     for i in range(60):
+    ...         if dispatcher.activated == False:
+    ...             break
+    ...         time.sleep(0.1)
+    ...     else:
+    ...         assert False, 'dispatcher never deactivated'
+
+.. [#cleanup1]
+
+    >>> lock.release()
+    >>> old_dispatcher.thread.join(3)
+    >>> old_dispatcher.dead_pools[0].threads[0].join(3)
+
+.. [#last_cleanup]
+
+    >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+    >>> dispatcher.thread.join(3)


Property changes on: zc.async/trunk/src/zc/async/catastrophes.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py	2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/dispatcher.py	2008-05-09 21:26:03 UTC (rev 86589)
@@ -124,7 +124,7 @@
         self.dispatcher = dispatcher
         self.name = name
         self.queue = Queue.Queue(0)
-        self._threads = []
+        self.threads = []
         self.setSize(size)
 
     def getSize(self):
@@ -216,16 +216,16 @@
             self._size = size
         res = []
         ct = 0
-        for t in self._threads:
+        for t in self.threads:
             if t.isAlive():
                 res.append(t)
                 ct += 1
-        self._threads[:] = res
+        self.threads[:] = res
         if ct < size:
             for i in range(max(size - ct, 0)):
                 t = threading.Thread(target=self.perform_thread)
                 t.setDaemon(True)
-                self._threads.append(t)
+                self.threads.append(t)
                 t.start()
         elif ct > size:
             # this may cause some bouncing, but hopefully nothing too bad.
@@ -283,8 +283,10 @@
         # timeout period when they are begun, so we give a bit of cushion.
         self.polls = zc.async.utils.Periodic(
             period=datetime.timedelta(minutes=10), buckets=5) # max of 12.5 min
+        self.polls.__parent__ = self
         self.jobs = zope.bforest.periodic.OOBForest(
             period=datetime.timedelta(minutes=20), count=9) # max of 22.5 min
+        self.jobs.__parent__ = self
         self._activated = set()
         self.queues = {}
         self.dead_pools = []
@@ -362,12 +364,13 @@
                         else:
                             zc.async.utils.log.error(
                                 'UUID %s already activated in queue %s '
-                                '(oid %s): another process?  To stop '
+                                '(oid %d): another process?  (To stop '
                                 'poll attempts in this process, set '
                                 '``zc.async.dispatcher.get().activated = '
                                 "False``.  To stop polls permanently, don't "
-                                'start a zc.async.dispatcher!',
-                                self.UUID, queue.name, queue._p_oid)
+                                'start a zc.async.dispatcher!)',
+                                self.UUID, queue.name,
+                                ZODB.utils.u64(queue._p_oid))
                             continue
                     da.activate()
                     self._activated.add(queue._p_oid)
@@ -455,10 +458,10 @@
                             self.dead_pools.append(pools.pop(name))
                     if conn_delta:
                         db = queues._p_jar.db()
-                        # this is a bit premature--it should really happen
-                        # when all threads are complete--but since the pool just
-                        # complains if the size is not honored, and this approach
-                        # is easier, we're doing this.
+                        # this is a bit premature--it should really happen when
+                        # all threads are complete--but since the pool just
+                        # complains if the size is not honored, and this
+                        # approach is easier, we're doing this.
                         db.setPoolSize(db.getPoolSize() + conn_delta)
             if len(self.queues) > len(poll_info):
                 conn_delta = 0
@@ -532,28 +535,31 @@
     def deactivate(self):
         if not self.activated:
             raise ValueError('not activated')
-        self.activated = False
-        transaction.begin()
+        self.activated = None # "in progress"
         try:
-            queues = self.conn.root().get(zc.async.interfaces.KEY)
-            if queues is not None:
-                for queue in queues.values():
-                    da = queue.dispatchers.get(self.UUID)
-                    if da is not None and da.activated:
-                        da.deactivate()
-                self._commit('trying to tear down')
+            transaction.begin()
+            try:
+                queues = self.conn.root().get(zc.async.interfaces.KEY)
+                if queues is not None:
+                    for queue in queues.values():
+                        da = queue.dispatchers.get(self.UUID)
+                        if da is not None and da.activated:
+                            da.deactivate()
+                    self._commit('trying to tear down')
+            finally:
+                transaction.abort()
+                self.conn.close()
+            conn_delta = 0
+            for queue_pools in self.queues.values():
+                for name, pool in queue_pools.items():
+                    conn_delta += pool.setSize(0)
+                    self.dead_pools.append(queue_pools.pop(name))
+            conn_delta -= 1
+            self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
+            zc.async.utils.log.info('deactivated dispatcher %s',
+                                    self.UUID)
         finally:
-            transaction.abort()
-            self.conn.close()
-        conn_delta = 0
-        for queue_pools in self.queues.values():
-            for name, pool in queue_pools.items():
-                conn_delta += pool.setSize(0)
-                self.dead_pools.append(queue_pools.pop(name))
-        conn_delta -= 1
-        self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
-        zc.async.utils.log.info('deactivated dispatcher %s',
-                                self.UUID)
+            self.activated = False # "completed" (can distinguish for tests)
 
     # these methods are used for monitoring and analysis
 

Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py	2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/job.py	2008-05-09 21:26:03 UTC (rev 86589)
@@ -240,6 +240,8 @@
             self._callable_name = value.__name__
         else:
             self._callable_root, self._callable_name = value, None
+        if zc.async.interfaces.IJob.providedBy(self._callable_root):
+            self._callable_root.parent = self
 
     def addCallbacks(self, success=None, failure=None):
         if success is not None or failure is not None:

Modified: zc.async/trunk/src/zc/async/tests.py
===================================================================
--- zc.async/trunk/src/zc/async/tests.py	2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/tests.py	2008-05-09 21:26:03 UTC (rev 86589)
@@ -143,6 +143,7 @@
             'subscribers.txt',
             'README.txt',
             'README_2.txt',
+            'catastrophes.txt',
             setUp=modSetUp, tearDown=modTearDown,
             optionflags=doctest.INTERPRET_FOOTNOTES),
         ))

Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py	2008-05-09 20:28:07 UTC (rev 86588)
+++ zc.async/trunk/src/zc/async/utils.py	2008-05-09 21:26:03 UTC (rev 86589)
@@ -140,7 +140,8 @@
         while key in self._data:
             key -= 1
         self._data[key] = item
-        item.parent = self.__parent__ # the agent
+        assert self.__parent__ is not None
+        item.parent = self.__parent__
         item.key = key
 
     def iter(self, start=None, stop=None):



More information about the Checkins mailing list