[Checkins] SVN: zc.async/trunk/ Mostly: prevent/handle an agent that is incorrectly identified as "dead" by a sibling.

Gary Poster gary at modernsongs.com
Wed Sep 3 22:53:10 EDT 2008


Log message for revision 90777:
  Mostly: prevent/handle an agent that is incorrectly identified as "dead" by a sibling.
  
  - We only want to claim a job if we are activated.   Make the agent check the
    ``activated`` and ``dead`` attributes of the parent dispatcher before
    claiming.
  
  - When activating, also clean out jobs from the dispatcher's agents, just as
    with deactivating.  This should protect from unusual race conditions in
    which the dispatcher got a job after being deactivated.
  
  - Change dispatcher to ping before claiming jobs.
  
  - when a ping reactivates a dispatcher, use new method ``reactivate`` rather
    than ``activate``.  This fires a new ``DispatcherReactivated`` event.
  
  - It's still theoretically possible (for instance, with a
    badly-behaved long commit that causes a sibling to believe that the
    process is dead) that an async worker process would be working on a
    job that it shouldn't be.  For instance, the job has been taken away,
    and is another process' responsibility now.  Now, whenever a
    process is about to start any work (especially a retry), it should
    double-check that the job is registered as being performed by itself.
    If not, the process should abort the transaction, make an error
    log, and give up on the job.  Write conflict errors on the job should
    protect us from the edge cases in this story.  XXX test
  
  - Make ftesting try to join worker threads, in addition to polling thread,
    to try to eliminate intermittent test-runner warnings in ftests that a
    thread is left behind.  XXX test
  
  Additional bits in this check-in.
  
  - add test for parallel and serial, and fix exposed bug in serial
  
  - beginnings of work on grok tutorial with zc.async.  More coming.
  
  - add some very basic PyPI trove metadata.
  
  - add in z3c.recipe.tag to buildout
  
  - copy "use NTP for multiple machines" to the "tips" section.
  
  - add ``getAgentName`` to threadlocal object.  This supports the main job for
    this commit.
  
  - move ``wait_for_start``, ``wait_for_deactivation``, and ``wait_for_death`` to
    zc.async.testing from catastrophes.txt.  Now also used in
    parallel_serial.txt.
  
  

Changed:
  _U  zc.async/trunk/
  U   zc.async/trunk/buildout.cfg
  U   zc.async/trunk/setup.py
  A   zc.async/trunk/sphinx/QUICKSTART_2_GROK.txt
  U   zc.async/trunk/sphinx/index.txt
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  A   zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt
  U   zc.async/trunk/src/zc/async/README_1.txt
  U   zc.async/trunk/src/zc/async/TODO.txt
  U   zc.async/trunk/src/zc/async/agent.py
  U   zc.async/trunk/src/zc/async/agent.txt
  U   zc.async/trunk/src/zc/async/catastrophes.txt
  U   zc.async/trunk/src/zc/async/dispatcher.py
  U   zc.async/trunk/src/zc/async/dispatcher.txt
  U   zc.async/trunk/src/zc/async/ftesting.py
  U   zc.async/trunk/src/zc/async/interfaces.py
  U   zc.async/trunk/src/zc/async/job.py
  A   zc.async/trunk/src/zc/async/parallel_serial.txt
  U   zc.async/trunk/src/zc/async/queue.py
  U   zc.async/trunk/src/zc/async/queue.txt
  U   zc.async/trunk/src/zc/async/testing.py
  U   zc.async/trunk/src/zc/async/tests.py
  U   zc.async/trunk/src/zc/async/threadlocal.py
  U   zc.async/trunk/src/zc/async/tips.txt
  U   zc.async/trunk/src/zc/async/utils.py

-=-

Property changes on: zc.async/trunk
___________________________________________________________________
Name: svn:ignore
   - develop-eggs
bin
parts
.installed.cfg
dist
TEST_THIS_REST_BEFORE_REGISTERING.txt
*.kpf
*.bbproject

   + develop-eggs
bin
parts
.installed.cfg
dist
TEST_THIS_REST_BEFORE_REGISTERING.txt
*.kpf
*.bbproject
tags
TAGS
ID


Modified: zc.async/trunk/buildout.cfg
===================================================================
--- zc.async/trunk/buildout.cfg	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/buildout.cfg	2008-09-04 02:53:09 UTC (rev 90777)
@@ -4,6 +4,8 @@
     test
     z3interpreter
     z3test
+    tags
+unzip = true
 
 develop = .
 
@@ -34,3 +36,7 @@
 recipe = zc.recipe.egg
 eggs = zc.async [z3]
 interpreter = z3py
+
+[tags]
+recipe = z3c.recipe.tag:tags
+eggs = zc.async

Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/setup.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -91,6 +91,7 @@
         'ZODB3',
         'pytz',
         'rwproperty',
+        'setuptools',
         'uuid',
         'zc.queue',
         'zc.dict>=1.2.1',
@@ -115,5 +116,12 @@
             'zope.app.component',
             'simplejson',
             ]},
-    url='http://packages.python.org/zc.async'
+    url='http://packages.python.org/zc.async',
+    classifiers=[
+        "Development Status :: 5 - Production/Stable",
+        "Intended Audience :: Developers",
+        "License :: OSI Approved :: Zope Public License",
+        "Operating System :: OS Independent",
+        "Programming Language :: Python",
+        "Framework :: ZODB"],
     )

Added: zc.async/trunk/sphinx/QUICKSTART_2_GROK.txt
===================================================================
--- zc.async/trunk/sphinx/QUICKSTART_2_GROK.txt	                        (rev 0)
+++ zc.async/trunk/sphinx/QUICKSTART_2_GROK.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -0,0 +1 @@
+link ../src/zc/async/QUICKSTART_2_GROK.txt
\ No newline at end of file


Property changes on: zc.async/trunk/sphinx/QUICKSTART_2_GROK.txt
___________________________________________________________________
Name: svn:special
   + *

Modified: zc.async/trunk/sphinx/index.txt
===================================================================
--- zc.async/trunk/sphinx/index.txt	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/sphinx/index.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -93,10 +93,8 @@
    :maxdepth: 1
    
    QUICKSTART_1_VIRTUALENV
+   QUICKSTART_2_GROK
 
-XXX QUICKSTART_ZC_BUILDOUT
-XXX QUICKSTART_GROK
-
 Documentation
 -------------
 

Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -2,7 +2,7 @@
 Changes
 =======
 
-1.4.2 (2008-??-??)
+1.5.0 (2008-??-??)
 ==================
 
 - Documentation improvements.  Converted documentation into Sphinx system.
@@ -18,17 +18,45 @@
 
 - Added zc.async.partial.Partial for backward compatibility purposes.
 
-- Fix support for Twisted installed reactor
+- Fix support for Twisted installed reactor.
 
-- Fix retry behavior for parallel and serial jobs XXX NEEDS TEST
+- Fix retry behavior for parallel and serial jobs
 
 - Tweaked the uuid.txt to mention zdaemon/supervisor rather than Zope 3.
 
-- Fixed some bugs in egg creation
+- Fixed some bugs in egg creation.
 
 - Changed quotas to not use a container that has conflict resolution, since
   these values should be a strict maximum.
 
+- We only want to claim a job if we are activated.   Make the agent check the
+  ``activated`` and ``dead`` attributes of the parent dispatcher before
+  claiming.
+
+- When activating, also clean out jobs from the dispatcher's agents, just as
+  with deactivating.  This should protect from unusual race conditions in
+  which the dispatcher got a job after being deactivated.
+
+- Change dispatcher to ping before claiming jobs.
+
+- when a ping reactivates a dispatcher, use new method ``reactivate`` rather
+  than ``activate``.  This fires a new ``DispatcherReactivated`` event.
+
+- It's still theoretically possible (for instance, with a
+  badly-behaved long commit that causes a sibling to believe that the
+  process is dead) that an async worker process would be working on a
+  job that it shouldn't be.  For instance, the job has been taken away,
+  and is another process' responsibility now.  Now, whenever a
+  process is about to start any work (especially a retry), it should
+  double-check that the job is registered as being performed by itself.
+  If not, the process should abort the transaction, make an error
+  log, and give up on the job.  Write conflict errors on the job should
+  protect us from the edge cases in this story.  XXX test
+
+- Make ftesting try to join worker threads, in addition to polling thread,
+  to try to eliminate intermittent test-runner warnings in ftests that a
+  thread is left behind.  XXX test
+
 1.4.1 (2008-07-30)
 ==================
 

Added: zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt
===================================================================
--- zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt	                        (rev 0)
+++ zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -0,0 +1,183 @@
+=====================
+Quickstart with Grok_
+=====================
+
+Goals
+=====
+
+In this quickstart, we will use zc.async to make a small web application that
+XXX
+
+XXX For simplicity, we'll assume that we are making several instances on the
+same machine, such as you might do with a few processors at your disposal.  To
+get the true advantage of high availability in production, you'd want at least
+two boxes, with a deployment of a ZEO server (or equivalent, for RelStorage),
+some kind of redundancy for your database (ZRS, slony) and instructions for
+each box on how to connect to the ZEO primary.
+
+This quickstart builds on the :ref:`quickstart-with-virtualenv`.  I suggest you
+read that through before this one.  
+
+- That previous quickstart introduces |async| through the Python interpreter
+  for a very casual and quick start.  
+
+- It also is more "pure-Python" with very little understanding needed of
+  additional frameworks to follow and use the examples.
+
+This quickstart instead uses the following somewhat "heavier" technologies.
+
+- |zc.buildout|_ is a way of producing repeatable software build-outs, for
+  development and conceivably for deployment.  It is written in Python, but is
+  not Python-specific, and it has found use as a ``make`` replacement for many
+  projects.
+
+- Grok_ is a web framework emerging from "Zope 3" technologies.  From their
+  website:
+  
+    Grok is a web application framework for Python developers. It is aimed at
+    both beginners and very experienced web developers. Grok has an emphasis on
+    agile development. Grok is easy and powerful.
+
+This guide, then, takes a somewhat slower definition of "quick" for its
+"quickstart", in exchange for more guidance and care with a view towards
+production-readiness.
+
+.. _Grok: http://grok.zope.org/
+
+.. |async| replace:: ``zc.async``
+
+.. _`async`: http://pypi.python.org/pypi/zc.async
+
+.. |zc.buildout| replace:: ``zc.buildout``
+
+.. _`zc.buildout`: http://pypi.python.org/pypi/zc.buildout
+
+Prerequisites
+=============
+
+.. sidebar:: Building Python 2.4.5 on OS X Leopard
+
+   Unfortunately, building a clean, standalone workable Python 2.4.5 on OS X is
+   not obvious.  This is what I recommend, if you are working on that platform.
+   
+   First you need macports.  Go to macports.org and download the newest
+   version.  It doesn't seem to set up the manual path correctly, so after the
+   installation add this to your ~/.profile (or in a similar place)::
+   
+    export MANPATH=/opt/local/man:$MANPATH
+   
+   You'll need a new terminal session (or other shell magic if you know it) for
+   these changes to take effect.  The easiest thing to do is close the shell
+   you are working in and open a new one.
+
+   Download a source distribution of Python 2.4.5.  You may have your own
+   approach as to where to put things, but I'll go with this pattern in this
+   document: ~/src will hold expanded source trees, ~/opt will hold our local
+   Python, and we'll develop in ~/dev.
+   
+   We will want readline and need zlib from macports.
+
+   ::
+
+    $ sudo port -c install readline
+    $ sudo port -c install zlib
+   
+   Now we'll do the usual dance, with a couple of ugly extra steps.
+
+   **Note: replace ``/Users/gary/opt/py`` in the below with your own desired
+   location!**
+
+   ::
+
+    $ MACOSX_DEPLOYMENT_TARGET=10.5 ./configure \
+      --prefix=/Users/gary/opt/py \
+      LDFLAGS=-L/opt/local/lib \
+      OPT=-I/opt/local/include
+    $ make
+    $ make install
+
+   Now, given my ``--prefix``, I'll find my python in
+   ``/Users/gary/opt/py/bin/python``.
+
+Grok requires Python 2.4.  Moreover, for more repeatable installations, many
+developers strongly recommend using a "clean", non-system Python, to reduce the
+probability of unnecessary or spurious problems (in your software *or* in your
+system!).  Therefore, consider building your own Python 2.4 for your
+development.
+
+We'll also expect that your Python has |easy_install|_.  If it doesn't, you can
+just download `ez_setup.py`_ and then run it with your local, development
+Python (e.g., ``~/opt/py/bin/python ez_setup.py``). This will install the
+easy_install command for your development Python in the same bin directory as
+your Python (e.g., ``~/opt/py/bin/easy_install``).
+
+.. |easy_install| replace:: ``easy_install``
+
+.. _`easy_install`: http://peak.telecommunity.com/DevCenter/EasyInstall
+
+.. _`ez_setup.py`: http://peak.telecommunity.com/dist/ez_setup.py
+
+grokproject_
+============
+
+.. sidebar:: |zc.buildout| Conveniences
+
+   You may want to consider the following conveniences if you are building many
+   projects with |zc.buildout|.  They make zc.buildout keep two shared
+   collections across all of your zc.buildout projects.  This can significantly
+   speed up the time to buildout new applications.  One shared collection is a
+   download cache of source distributions and eggs.  The other is an egg cache
+   only, for both the downloaded eggs and the eggs generated on your machine.
+   
+   In your home directory, make a ``.buildout`` directory.  In that directory,
+   make two sub-directories, ``eggs`` and ``download-cache``.  Also in
+   ``.buildout``, create a file named ``default.cfg`` with the following
+   content, where ``/Users/gary`` is replaced with the path to your home
+   directory::
+
+    [buildout]
+    eggs-directory=/Users/gary/.buildout/eggs
+    download-cache=/Users/gary/.buildout/download-cache
+
+   There are many other possible settings to make here (for instance, we could
+   specify the clean Python you built here), but these are all I
+   currently bother with.
+
+   It is also worth mentioning that, as of this writing, setuptools builds eggs
+   in such a way as to confuse the Python debugger.  If you use the Python
+   debugger and discover that you want to see the lines in an egg and can't,
+   the following line (or something like it) will help for non-zipped eggs::
+   
+    find ~/.buildout/eggs-aside/ -name '*.pyc' -exec rm {} \;
+
+Grok has a pleasantly convenient way to start a project.  It is called
+grokproject_.  Use your local Python's ``easy_install`` to install it.  For
+instance, I might type ``~/opt/py/bin/easy_install grokproject``.
+
+After it runs, it should have installed the ``grokproject`` command in the same
+bin directory as your local Python (e.g., ``~/opt/py/bin/grokproject``).
+
+.. _grokproject: http://pypi.python.org/pypi/grokproject
+
+Skeleton
+========
+
+Now we will use grokproject_ to make a skeleton of our package.  Let's just
+call the project "quickstart".  Go to a directory in which you want to develop
+our package.  Then use the newly installed ``grokproject`` command to create
+
+XXX
+
+- include zc.async in setup.py; mention versions.cfg
+
+- set up ZEO
+
+- set up multiple instances
+
+- zope.app.testing = 3.4.1 -> 3.4.2
+
+- set up interpreter
+
+- set up z3monitor
+
+- make separate debug instance


Property changes on: zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: zc.async/trunk/src/zc/async/README_1.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_1.txt	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/README_1.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -970,6 +970,7 @@
      'failed': False,
      'poll id': ...,
      'quota names': (),
+     'reassigned': False,
      'result': None,
      'started': datetime.datetime(...),
      'thread': ...}
@@ -1011,6 +1012,7 @@
      'failed': False,
      'poll id': ...,
      'quota names': (),
+     'reassigned': False,
      'result': '42',
      'started': datetime.datetime(...),
      'thread': ...}
@@ -1164,6 +1166,14 @@
 
 .. [#stop_usage_reactor]
 
+    >>> reactor.stop()
+    >>> zc.async.testing.wait_for_deactivation(dispatcher)
+    >>> for queue_pools in dispatcher.queues.values():
+    ...     for name, pool in queue_pools.items():
+    ...         pool.setSize(0)
+    ...         for thread in pool.threads:
+    ...             thread.join(3)
+    ...
     >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
     {'failed': 2,
      'longest active': None,
@@ -1177,4 +1187,3 @@
      'statistics start': datetime.datetime(2006, 8, 10, 16, ...),
      'successful': 52,
      'unknown': 0}
-    >>> reactor.stop()

Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/TODO.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -1,9 +1,9 @@
 - fix up tips so that it looks better
-- finish quickstart
-- write a zc.buildout quickstart
-- write a grok quickstart
-- write test for using installed twisted reactor
-- write test for interrupt retry of parallel/serial
+- write a zc.buildout/grok quickstart
+- zc.z3monitor:
+  + asyncdb count pending callable:foo*
+  + asyncdb count completed agent:entry5
+  + asyncdb countall completed
 
 Improvements
 

Modified: zc.async/trunk/src/zc/async/agent.py
===================================================================
--- zc.async/trunk/src/zc/async/agent.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/agent.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -69,6 +69,16 @@
         return res
 
     def claimJob(self):
+        if not self.parent.activated or self.parent.dead:
+            # we don't want to claim a job unless we are activated.
+            # Normally, this should be the case, but in unusual
+            # circumstances, such as very long commits causing the
+            # ping to not be able to commit, we might get in this
+            # unfortunate circumstance.
+            # TODO: we would like to have a read conflict error if we read
+            # activated but it changed beneath us.  If the ZODB grows a gesture
+            # to cause this, use it.
+            return None
         if len(self._data) < self.size:
             res = self.chooser(self)
             if res is not None:

Modified: zc.async/trunk/src/zc/async/agent.txt
===================================================================
--- zc.async/trunk/src/zc/async/agent.txt	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/agent.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -94,6 +94,36 @@
     >>> len(agent)
     3
 
+An agent may not claim a job if its parent is considered deactivated or dead.
+This is a safeguard for unlikely situations in which a parent dispatcher has
+been incorrectly labeled as dead because of ping times (see catastrophes.txt
+for details).
+
+    >>> da.deactivate()
+    >>> len(agent)
+    0
+    >>> bool(da.activated)
+    False
+    >>> print agent.claimJob()
+    None
+    >>> da.activate()
+    >>> import datetime
+    >>> import pytz
+    >>> da.activated = da.last_ping.value = (datetime.datetime.now(pytz.UTC) -
+    ...                                      datetime.timedelta(days=365))
+    >>> bool(da.activated)
+    True
+    >>> da.dead
+    True
+    >>> print agent.claimJob()
+    None
+    >>> da.deactivate()
+    >>> da.activate()
+    >>> agent.claimJob() is job5
+    True
+    >>> len(agent)
+    1
+
 This particular agent invites you to provide a function to choose jobs.
 The default one simply chooses the first available job in the queue.
 

Modified: zc.async/trunk/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes.txt	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/catastrophes.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -513,7 +513,7 @@
     >>> transaction.commit()
     >>> dispatcher = zc.async.dispatcher.get()
     >>> poll = zc.async.testing.get_poll(dispatcher)
-    >>> wait_for_start(job)
+    >>> zc.async.testing.wait_for_start(job)
 
 In this scenario, ``wait_for_me`` is a job that, the first time it is run, will
 "unexpectedly" be lost while the dispatcher stops working. ``handle_result``
@@ -525,7 +525,7 @@
 clean up its dispatcher agents, and job.handleInterrupt() goes into the queue.
 
     >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
-    >>> wait_to_deactivate(dispatcher)
+    >>> zc.async.testing.wait_for_deactivation(dispatcher)
     >>> _ = transaction.begin()
     >>> job.status == zc.async.interfaces.ACTIVE
     True
@@ -586,7 +586,7 @@
     >>> transaction.commit()
     >>> dispatcher = zc.async.dispatcher.get()
     >>> poll = zc.async.testing.get_poll(dispatcher)
-    >>> wait_for_start(job)
+    >>> zc.async.testing.wait_for_start(job)
 
 Now we'll "crash" the dispatcher.
 
@@ -679,11 +679,10 @@
     0
     >>> import datetime
     >>> da.ping_death_interval = datetime.timedelta(seconds=1)
-    >>> da.dead
-    True
+    >>> transaction.commit()
+    >>> zc.async.testing.wait_for_death(da)
     >>> bool(da.activated)
     True
-    >>> transaction.commit()
 
 After the next poll, the dispatcher will have cleaned up its old tasks in the
 same way we saw in the previous example. The job's ``handleInterrupt`` method
@@ -747,7 +746,7 @@
     >>> transaction.commit()
     >>> dispatcher = zc.async.dispatcher.get()
     >>> poll = zc.async.testing.get_poll(dispatcher)
-    >>> wait_for_start(job)
+    >>> zc.async.testing.wait_for_start(job, seconds=30) # XXX not sure why so long
 
 Now we'll start up an alternate dispatcher.
 
@@ -764,6 +763,7 @@
 
     >>> da.ping_death_interval = datetime.timedelta(seconds=60)
     >>> transaction.commit()
+    >>> 
 
 Now we'll "crash" the dispatcher.
 
@@ -771,6 +771,8 @@
     ...                              # cleanup
     >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
     >>> dispatcher.thread.join(3)
+    >>> dispatcher.thread.isAlive()
+    False
 
 As discussed in the previous example, the polling hasn't timed out yet, so the
 alternate dispatcher can't know that the first one is dead. Therefore, the job
@@ -808,11 +810,10 @@
     datetime.timedelta(0, 60)
     >>> import datetime
     >>> da.ping_death_interval = datetime.timedelta(seconds=1)
-    >>> da.dead
-    True
+    >>> transaction.commit()
+    >>> zc.async.testing.wait_for_death(da)
     >>> bool(da.activated)
     True
-    >>> transaction.commit()
 
 After the second dispatcher gets a poll--a chance to notice--it will have
 cleaned up the first dispatcher's old tasks in the same way we saw in the
@@ -878,25 +879,6 @@
     >>> import transaction
     >>> _ = transaction.begin()
 
-    >>> from time import sleep as time_sleep # we import in this manner so
-    ... # that our testing monkey-patch of time.sleep does not affect our tests
-    >>> def wait_for_start(job):
-    ...     for i in range(60):
-    ...         t = transaction.begin()
-    ...         if job.status == zc.async.interfaces.ACTIVE:
-    ...             break
-    ...         time_sleep(0.1)
-    ...     else:
-    ...         assert False, 'job never started'
-
-    >>> def wait_to_deactivate(dispatcher):
-    ...     for i in range(60):
-    ...         if dispatcher.activated == False:
-    ...             break
-    ...         time_sleep(0.1)
-    ...     else:
-    ...         assert False, 'dispatcher never deactivated'
-
 .. [#cleanup1]
 
     >>> lock.release()

Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/dispatcher.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -61,6 +61,7 @@
 
     def perform_thread(self):
         zc.async.local.dispatcher = self.dispatcher
+        zc.async.local.name = self.name # this is the name of this pool's agent
         conn = self.dispatcher.db.open()
         try:
             job_info = self.queue.get()
@@ -141,6 +142,11 @@
                                     transaction.abort() # retry forever (!)
                                 else:
                                     break
+                    except zc.async.interfaces.ReassignedError:
+                        transaction.abort()
+                        info['reassigned'] = True
+                        # will need to get next job_info and continue
+                    # EXPLOSIVE_ERRORS includes Reassigned: order is important
                     except zc.async.utils.EXPLOSIVE_ERRORS:
                         transaction.abort()
                         raise
@@ -165,7 +171,7 @@
                     info['completed'] = datetime.datetime.utcnow()
                 finally:
                     zc.async.local.job = None # also in job (here for paranoia)
-                    transaction.abort()
+                    transaction.abort() # (also paranoia)
                 zc.async.utils.tracelog.info(
                     'completed in thread %d: %s',
                     info['thread'], info['call'])
@@ -229,8 +235,8 @@
     thread = None # this is just a placeholder that other code can use the
     # way that zc.async.subscribers.ThreadedDispatcherInstaller.__call__ does.
 
-    def __init__(self, db, reactor=None, poll_interval=5, uuid=None, jobs_size=200,
-                 polls_size=400):
+    def __init__(self, db, reactor=None, poll_interval=5, uuid=None,
+                 jobs_size=200, polls_size=400):
         if uuid is None:
             uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
         if uuid in _dispatchers:
@@ -331,6 +337,10 @@
                         self._activated.add(queue._p_oid)
                     else:
                         continue
+                identifier = 'committing ping for UUID %s' % (self.UUID,)
+                zc.async.utils.try_five_times(
+                    lambda: queue.dispatchers.ping(self.UUID), identifier,
+                    transaction)
                 queue_info = poll_info[queue.name] = {}
                 pools = self.queues.get(queue.name)
                 if pools is None:
@@ -376,7 +386,8 @@
                                     'call': repr(job),
                                     'started': None,
                                     'completed': None,
-                                    'thread': None}
+                                    'thread': None,
+                                    'reassigned': False}
                             started_jobs.append(info)
                             dbname = getattr(
                                 job._p_jar.db(), 'database_name', None)
@@ -386,10 +397,6 @@
                             pool.queue.put(
                                 (job._p_oid, dbname, info))
                             job = self._getJob(agent)
-                identifier = 'committing ping for UUID %s' % (self.UUID,)
-                zc.async.utils.try_five_times(
-                    lambda: queue.dispatchers.ping(self.UUID), identifier,
-                    transaction)
                 if len(pools) > len(queue_info):
                     conn_delta = 0
                     for name, pool in pools.items():

Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/dispatcher.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -295,6 +295,7 @@
      'failed': False,
      'poll id': ...,
      'quota names': (),
+     'reassigned': False,
      'result': '42',
      'started': datetime.datetime(...),
      'thread': ...}
@@ -412,6 +413,40 @@
     >>> wait_for_result(job4)
     "reply is 'HIYA'.  Locally it is MISSING."
 
+Sometimes, hopefully rarely-to-never, the dispatcher polls don't happen
+frequently enough.  This might happen with a very long database commit, for
+instance, that locks out other changes until it is done.  In this case, a
+dispatcher is incorrectly regarded as dead (see ``catastrophes.txt`` for a
+detailed discussion).  The dispatcher polls before it gets jobs, which
+informs the queue that it is not actually dead.  Then the agent is still
+willing to claim jobs.
+
+    >>> job5 = queue.put(zc.async.job.Job(operator.mod, 85, 43))
+    >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+    >>> import pytz
+    >>> da.activated = da.last_ping.value = (datetime.datetime.now(pytz.UTC) -
+    ...                                      datetime.timedelta(days=365))
+    >>> da.dead
+    True
+    >>> da.deactivate()
+    >>> bool(da.activated)
+    False
+    >>> transaction.commit()
+    >>> ignore = get_poll(dispatcher)
+    >>> ignore = transaction.begin()
+    >>> bool(da.activated)
+    True
+    >>> evs = eventtesting.getEvents(
+    ...     zc.async.interfaces.IDispatcherReactivated)
+    >>> evs # doctest: +ELLIPSIS
+    [<zc.async.interfaces.DispatcherReactivated object at ...>]
+    >>> evs[0].object._p_oid == da._p_oid
+    True
+    >>> job5.status != zc.async.interfaces.PENDING
+    True
+    >>> wait_for_result(job5)
+    42
+
 We can analyze the work the dispatcher has done. The records for this generally
 only go back about ten or twelve minutes--just enough to get a feel for the
 current health of the dispatcher. Use the log if you want more long-term
@@ -425,10 +460,10 @@
      'shortest active': None,
      'shortest failed': (..., 'unnamed'),
      'shortest successful': (..., 'unnamed'),
-     'started': 8,
+     'started': 9,
      'statistics end': datetime.datetime(...),
      'statistics start': datetime.datetime(...),
-     'successful': 6,
+     'successful': 7,
      'unknown': 0}
 
 We can get a report on the reactor's status.

Modified: zc.async/trunk/src/zc/async/ftesting.py
===================================================================
--- zc.async/trunk/src/zc/async/ftesting.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/ftesting.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -49,4 +49,9 @@
         del dispatcher._debug_handler
     dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
     dispatcher.thread.join(3)
+    for queue_pools in dispatcher.queues.values():
+        for name, pool in queue_pools.items():
+            pool.setSize(0)
+            for thread in pool.threads:
+                thread.join(3)
     zc.async.dispatcher.clear()

Modified: zc.async/trunk/src/zc/async/interfaces.py
===================================================================
--- zc.async/trunk/src/zc/async/interfaces.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/interfaces.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -142,6 +142,12 @@
 class DispatcherDeactivated(AbstractObjectEvent):
     zope.interface.implements(IDispatcherDeactivated)
 
+class IDispatcherReactivated(IObjectEvent):
+    """Dispatcher was reactivated after mistaken deactivation"""
+
+class DispatcherReactivated(AbstractObjectEvent):
+    zope.interface.implements(IDispatcherReactivated)
+
 class IObjectAdded(IObjectEvent):
     """Object was added to the database"""
 
@@ -173,6 +179,13 @@
     This is almost certainly a programmer error."""
 
 
+class ReassignedError(Exception):
+    """The job has been reassigned to another process.
+    
+    This should only happen when a polling timeout has made a not-dead process
+    appear to be dead to a sibling."""
+
+
 class IAbstractJob(zope.interface.Interface):
 
     parent = zope.interface.Attribute(

Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/job.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -591,15 +591,19 @@
                 policy, self, call_name)
             return None
         identifier = 'getting result for %s retry for %r' % (call_name, self)
-        return zc.async.utils.never_fail(lambda: call(*args), identifier, tm)
+        res = zc.async.utils.never_fail(lambda: call(*args), identifier, tm)
+        self._check_reassigned((zc.async.interfaces.ACTIVE,)) # will raise
+        # exception if necessary
+        return res
 
     def __call__(self, *args, **kwargs):
-        if self.status not in (zc.async.interfaces.NEW,
-                               zc.async.interfaces.ASSIGNED):
+        statuses = (zc.async.interfaces.NEW, zc.async.interfaces.ASSIGNED)
+        if self.status not in statuses:
             raise zc.async.interfaces.BadStatusError(
                 'can only call a job with NEW or ASSIGNED status')
         tm = transaction.interfaces.ITransactionManager(self)
         def prepare():
+            self._check_reassigned(statuses)
             self._status_id = 1 # ACTIVE
             self._active_start = datetime.datetime.now(pytz.UTC)
             effective_args = list(args)
@@ -692,6 +696,7 @@
                         self, res)
                 res = failure
                 def complete():
+                    self._check_reassigned((zc.async.interfaces.ACTIVE,))
                     self._result = res
                     self._status_id = 2 # CALLBACKS
                     self._active_end = datetime.datetime.now(pytz.UTC)
@@ -794,6 +799,31 @@
                 queue.put(self, begin_after=now+when)
         return self
 
+    def _check_reassigned(self, expected_statuses):
+        agent = self.agent
+        res = self.status not in expected_statuses or (
+            zc.async.interfaces.IAgent.providedBy(agent) and
+            not zc.async.interfaces.IJob.providedBy(self._result) and
+            zc.async.local.getAgentName() is not None and
+            (zc.async.local.getAgentName() != agent.name or
+             zc.async.local.getDispatcher().UUID != agent.parent.UUID))
+        if res:
+            # the only known scenario for this to occur is the following.
+            # agent took job.  dispatcher gave it to a thread.  While
+            # performing the job, the poll was unable to write to the db,
+            # perhaps because of a database disconnect or because of a
+            # too-long commit in another process or thread.  Therefore,
+            # A sibling has noticed that this agent seems to have died
+            # and put this job back in the queue, where it has been claimed
+            # by another process/agent.
+            # It's debatable whether this is CRITICAL or ERROR level.  We'll
+            # go with ERROR for now.
+            zc.async.utils.log.error(
+                'Job %r was reassigned.  Likely cause was that polling was '
+                'unable to occur as regularly as expected, perhaps because of '
+                'long commit times in the application.', self)
+            raise zc.async.interfaces.ReassignedError()
+
     def _set_result(self, res, tm, data_cache=None):
         # returns whether to call ``resumeCallbacks``
         callback = True
@@ -814,7 +844,8 @@
             self._active_end = datetime.datetime.now(pytz.UTC)
         if self._retry_policy is not None and data_cache:
             self._retry_policy.updateData(data_cache)
-        tm.commit()
+        tm.commit() # this should raise a ConflictError if the job has been
+        # reassigned.
         return callback
 
     def _log_completion(self, res):
@@ -921,11 +952,13 @@
         if self.status != zc.async.interfaces.CALLBACKS:
             raise zc.async.interfaces.BadStatusError(
                 'can only resumeCallbacks on a job with CALLBACKS status')
+        self._check_reassigned((zc.async.interfaces.CALLBACKS,))
         callbacks = list(self.callbacks)
         tm = transaction.interfaces.ITransactionManager(self)
         length = 0
         while 1:
             for j in callbacks:
+                self._check_reassigned((zc.async.interfaces.CALLBACKS,))
                 if zc.async.interfaces.ICallbackProxy.providedBy(j):
                     j = j.getJob(self.result)
                 status = j.status
@@ -991,7 +1024,12 @@
             queue.put(postprocess)
 
 def _schedule_serial(*jobs, **kw):
-    _queue_next(zc.async.local.getJob())
+    for ix, job in enumerate(jobs): # important for interrupts
+        if job.status == zc.async.interfaces.NEW:
+            break
+    else:
+        ix += 1
+    _queue_next(zc.async.local.getJob(), ix)
     return kw['postprocess']
 
 def serial(*jobs, **kw):

Added: zc.async/trunk/src/zc/async/parallel_serial.txt
===================================================================
--- zc.async/trunk/src/zc/async/parallel_serial.txt	                        (rev 0)
+++ zc.async/trunk/src/zc/async/parallel_serial.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -0,0 +1,234 @@
+The ``parallel`` and ``serial`` helpers are described in README_1.txt, the
+usage document.  This document is a maintainer/test document meant to show
+how ``parallel`` and ``serial`` behave when they are interrupted.
+
+The two scenarios we want to show, for each helper, are an interruption during
+a composite job, and an introduction during the postprocess.
+
+First, we set up a world and some components for the test scenario.  You might
+want to try skipping to the next text block, if you are reading this through
+the first time, so you can get a broad idea of what is going on.
+
+    >>> import ZODB.FileStorage
+    >>> storage = ZODB.FileStorage.FileStorage(
+    ...     'zc_async.fs', create=True)
+    >>> from ZODB.DB import DB
+    >>> db = DB(storage)
+    >>> import zc.async.configure
+    >>> zc.async.configure.base()
+    >>> zc.async.configure.start(db, poll_interval=0.1)
+    >>> conn = db.open()
+    >>> import zc.async.interfaces
+    >>> queue = zc.async.interfaces.IQueue(conn)
+
+    >>> import zc.async.job
+    >>> import transaction
+    >>> import threading
+    >>> def normal_component():
+    ...     return 25
+    >>> component_lock = threading.Lock()
+    >>> component_lock.acquire()
+    True
+    >>> component_fail_flag = True
+    >>> def pause_component():
+    ...     global component_fail_flag
+    ...     if component_fail_flag:
+    ...         component_fail_flag = False
+    ...         component_lock.acquire()
+    ...         component_lock.release() # so we can use the same lock again later
+    ...         raise SystemExit() # this will cause the worker thread to exit
+    ...     else:
+    ...         component_lock.acquire() # for the next run
+    ...         component_fail_flag = True # for the next run
+    ...         return 17
+    ...
+    >>> postprocess_lock = threading.Lock()
+    >>> postprocess_lock.acquire()
+    True
+    >>> postprocess_fail_flag = True
+    >>> def pause_postprocess(job1, job2):
+    ...     global postprocess_fail_flag
+    ...     if postprocess_fail_flag:
+    ...         postprocess_fail_flag = False
+    ...         postprocess_lock.acquire()
+    ...         postprocess_lock.release() # so we can use the same lock again later
+    ...         raise SystemExit() # this will cause the worker thread to exit
+    ...     else:
+    ...         postprocess_lock.acquire() # for the next run
+    ...         postprocess_fail_flag = True # for the next run
+    ...         return job1.result + job2.result
+    ...
+    >>> def stop_dispatcher(dispatcher, lock=None):
+    ...     dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+    ...     zc.async.testing.wait_for_deactivation(dispatcher)
+    ...     dispatcher.thread.join(3)
+    ...     if lock is not None:
+    ...         lock.release()
+    ...     for queue_pools in dispatcher.queues.values():
+    ...         for name, pool in queue_pools.items():
+    ...             pool.setSize(0)
+    ...             for thread in pool.threads:
+    ...                 thread.join(3)
+    ...
+
+First we'll test ``serial``.  We stop the worker once while it is working on
+one of the serialized jobs, and once while it is working on the postprocess.
+Both examples use two soft interrupts (i.e., with clean shut-down; see
+catastrophes.txt).
+
+We create a job with two parts, the second of which will be interrupted; and
+a postprocess, which will also be interrupted.  We let it run through the first
+inner job, and start the second.
+
+    >>> job = queue.put(zc.async.job.serial(normal_component, pause_component,
+    ...             postprocess=pause_postprocess))
+    >>> transaction.commit()
+    >>> import zc.async.testing
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> zc.async.testing.wait_for_start(job.args[1])
+
+We interrupt the dispatcher.  Because it is a clean shut-down, we have two
+``handleInterrupt`` jobs waiting afterwards.  One is for the main job
+(``schedule_serial``), and the other is for the second inner job
+(``pause_component``).
+
+    >>> stop_dispatcher(dispatcher, component_lock) # INTERRUPT 1
+
+    >>> _ = transaction.begin()
+    >>> job.status == zc.async.interfaces.ACTIVE
+    True
+    >>> job.args[1].status == zc.async.interfaces.ACTIVE
+    True
+    >>> len(queue)
+    2
+    >>> queue[0] # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+    >>> queue[1] # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+    >>> queue[0].callable # doctest: +ELLIPSIS
+    <bound method Job.handleInterrupt of <...Job... ``...schedule_serial...``>>
+    >>> queue[1].callable # doctest: +ELLIPSIS
+    <bound method Job.handleInterrupt of <...Job... ``...pause_component()``>>
+
+We start up the dispatcher again (actually, it is a new one, but it is
+effectively the same in terms of its UUID).
+
+    >>> zc.async.dispatcher.clear()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...         poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db))
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> zc.async.testing.wait_for_start(job.kwargs['postprocess'])
+
+We interrupt the dispatcher again.  Now the postprocessing was interrupted.
+
+    >>> stop_dispatcher(dispatcher, postprocess_lock) # INTERRUPT 2
+
+    >>> _ = transaction.begin()
+    >>> job.status == zc.async.interfaces.ACTIVE
+    True
+    >>> job.kwargs['postprocess'].status == zc.async.interfaces.ACTIVE
+    True
+    >>> len(queue)
+    2
+    >>> queue[0] # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+    >>> queue[1] # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+    >>> queue[0].callable # doctest: +ELLIPSIS
+    <bound method Job.handleInterrupt of <...Job... ``...schedule_serial...``>>
+    >>> queue[1].callable # doctest: +ELLIPSIS
+    <...method Job.handleInterrupt of <...Job... ``...pause_postprocess(...``>>
+
+Finally we restart without subsequent interruptions.  The job runs
+successfully to completion.
+
+    >>> zc.async.dispatcher.clear()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...         poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db))
+    >>> dispatcher = zc.async.dispatcher.get()
+
+    >>> zc.async.testing.wait_for_result(job)
+    42
+
+Now we'll do the same thing for ``parallel``.  You'll notice that this is
+virtually identical, other than a slightly different set up.
+
+We create a job with two parts, the second of which will be interrupted; and
+a postprocess, which will also be interrupted.  We let it run through the first
+inner job, and start the second.
+
+    >>> job = queue.put(zc.async.job.parallel(
+    ...     pause_component, normal_component, postprocess=pause_postprocess))
+    >>> transaction.commit()
+    >>> import zc.async.testing
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> zc.async.testing.wait_for_result(job.args[1])
+    25
+    >>> zc.async.testing.wait_for_start(job.args[0])
+
+We interrupt the dispatcher.  Because it is a clean shut-down, we have two
+``handleInterrupt`` jobs waiting afterwards.  One is for the main job
+(``schedule_serial``), and the other is for the second inner job
+(``pause_component``).
+
+    >>> stop_dispatcher(dispatcher, component_lock) # INTERRUPT 1
+
+    >>> _ = transaction.begin()
+    >>> job.status == zc.async.interfaces.ACTIVE
+    True
+    >>> job.args[0].status == zc.async.interfaces.ACTIVE
+    True
+    >>> len(queue)
+    2
+    >>> queue[0] # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+    >>> queue[1] # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+    >>> queue[0].callable # doctest: +ELLIPSIS
+    <bound method Job.handleInterrupt of <...Job... ``...schedule_parallel...``>>
+    >>> queue[1].callable # doctest: +ELLIPSIS
+    <bound method Job.handleInterrupt of <...Job... ``...pause_component()``>>
+
+We start up the dispatcher again (actually, it is a new one, but it is
+effectively the same in terms of its UUID).
+
+    >>> zc.async.dispatcher.clear()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...         poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db))
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> zc.async.testing.wait_for_start(job.kwargs['postprocess'])
+
+We interrupt the dispatcher again.  Now the postprocessing was interrupted.
+
+    >>> stop_dispatcher(dispatcher, postprocess_lock) # INTERRUPT 2
+
+    >>> _ = transaction.begin()
+    >>> job.status == zc.async.interfaces.ACTIVE
+    True
+    >>> job.kwargs['postprocess'].status == zc.async.interfaces.ACTIVE
+    True
+    >>> len(queue)
+    2
+    >>> queue[0] # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+    >>> queue[1] # doctest: +ELLIPSIS
+    <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+    >>> queue[0].callable # doctest: +ELLIPSIS
+    <bound method Job.handleInterrupt of <...Job... ``...schedule_parallel...``>>
+    >>> queue[1].callable # doctest: +ELLIPSIS
+    <...method Job.handleInterrupt of <...Job... ``...pause_postprocess(...``>>
+
+Finally we restart without subsequent interruptions.  The job runs
+successfully to completion.
+
+    >>> zc.async.dispatcher.clear()
+    >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+    ...         poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db))
+    >>> dispatcher = zc.async.dispatcher.get()
+
+    >>> zc.async.testing.wait_for_result(job)
+    42
+
+Shut down.
+
+    >>> stop_dispatcher(dispatcher)


Property changes on: zc.async/trunk/src/zc/async/parallel_serial.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: zc.async/trunk/src/zc/async/queue.py
===================================================================
--- zc.async/trunk/src/zc/async/queue.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/queue.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -85,14 +85,17 @@
     def activate(self):
         if self.activated:
             raise ValueError('Already activated')
+        # in exceptional circumstances, the agents may have in-progress jobs
+        # left in them.  These will never be worked on, and will block the
+        # agents from using these slots in their "size", until the jobs are
+        # removed. This  can be catastrophic.  Therefore we iterate over all
+        # the agents to make sure they are all clean before activating.
+        self._clean()
         self.activated = datetime.datetime.now(pytz.UTC)
         zope.event.notify(
             zc.async.interfaces.DispatcherActivated(self))
 
-    def deactivate(self):
-        if not self.activated:
-            raise ValueError('Not activated')
-        self.activated = None
+    def _clean(self):
         queue = self.parent
         assert zc.async.interfaces.IQueue.providedBy(queue)
         for agent in self.values():
@@ -141,9 +144,22 @@
                         job = agent.pull()
                     except IndexError:
                         job = None
+
+    def deactivate(self):
+        if not self.activated:
+            raise ValueError('Not activated')
+        self.activated = None
+        self._clean()
         zope.event.notify(
             zc.async.interfaces.DispatcherDeactivated(self))
 
+    def reactivate(self):
+        # this is called *only* by ``poll``.  ``poll`` calls ``reactivate``
+        # when ``poll`` discovers that a dispatcher, thought dead, is still
+        # alive.
+        self.activated = datetime.datetime.now(pytz.UTC)
+        zope.event.notify(
+            zc.async.interfaces.DispatcherReactivated(self))
 
 class Queues(zc.async.utils.Dict):
 
@@ -188,7 +204,13 @@
                 "if the dispatcher was inappropriately viewed as ``dead`` and "
                 "deactivated, you should investigate the cause.",
                 uuid)
-            da.activate()
+            # we do this rather than calling ``activate`` because the semantics
+            # are different.  ``activate`` is after a true deactivation, and
+            # cleans out the agents and fires off an activation event.  This
+            # is inappropriate here, and could easily cause problems.
+            # ``reactivate`` is specifically for this circumstance: a
+            # dispatcher thought dead is discovered to be alive.
+            da.reactivate()
         now = datetime.datetime.now(pytz.UTC)
         last_ping = da.last_ping.value
         if (last_ping is None or

Modified: zc.async/trunk/src/zc/async/queue.txt
===================================================================
--- zc.async/trunk/src/zc/async/queue.txt	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/queue.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -605,8 +605,8 @@
     >>> evs[0].object is da
     True
 
-Now a dispatcher should iterate over agents and look for jobs.  There are
-not any agents at the moment.
+Now, after activation, a dispatcher should iterate over agents and look for
+jobs.  There are not any agents at the moment.
 
     >>> len(da)
     0
@@ -836,18 +836,38 @@
     >>> alt_agent.completed.first() is jobD
     True
 
+Activating a dispatcher also cleans out active jobs from its agents. Doing
+it on activation is a fail-safe: normally, agents should not have jobs in
+them after deactivation.
+
+    >>> alt_agent.claimJob() is jobE
+    True
+    >>> len(alt_agent)
+    1
+    >>> alt_da.activate()
+    >>> len(alt_agent)
+    0
+    >>> queue[0] is jobA
+    True
+    >>> queue[1].callable == jobB.handleInterrupt
+    True
+    >>> queue[2].callable == jobC.resumeCallbacks
+    True
+    >>> queue[3] is jobE
+    True
+
 As an aside, notice that the ``handleInterrupt`` and ``resumeCallbacks`` jobs
 have custom error log levels, and custom retry policies.
 
     >>> import logging
+    >>> queue[1].failure_log_level == logging.CRITICAL
+    True
     >>> queue[2].failure_log_level == logging.CRITICAL
     True
-    >>> queue[3].failure_log_level == logging.CRITICAL
+    >>> queue[1].retry_policy_factory is zc.async.job.RetryCommonForever
     True
     >>> queue[2].retry_policy_factory is zc.async.job.RetryCommonForever
     True
-    >>> queue[3].retry_policy_factory is zc.async.job.RetryCommonForever
-    True
 
 If you have multiple workers, it is strongly suggested that you get the
 associated servers connected to a shared time server.

Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/testing.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -201,6 +201,32 @@
     else:
         assert False, 'no poll!'
 
+def wait_for_start(job, seconds=6):
+    for i in range(seconds * 10):
+        t = transaction.begin()
+        if job.status == zc.async.interfaces.ACTIVE:
+            break
+        time_sleep(0.1)
+    else:
+        assert False, 'job never started (%s)' % (job.status,)
+
+def wait_for_deactivation(dispatcher, seconds=6):
+    for i in range(seconds * 10):
+        if dispatcher.activated == False:
+            break
+        time_sleep(0.1)
+    else:
+        assert False, 'dispatcher never deactivated'
+
+def wait_for_death(da, seconds=6):
+    for i in range(seconds * 10):
+        _ = transaction.begin()
+        if da.dead:
+            break
+        time_sleep(0.1)
+    else:
+        assert False, 'dispatcher agent never died'
+
 def wait_for_result(job, seconds=6):
     for i in range(seconds * 10):
         t = transaction.begin()

Modified: zc.async/trunk/src/zc/async/tests.py
===================================================================
--- zc.async/trunk/src/zc/async/tests.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/tests.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -143,6 +143,7 @@
             'agent.txt',
             'dispatcher.txt',
             'subscribers.txt',
+            'parallel_serial.txt',
             'twisted.txt',
             'README_1.txt',
             'README_2.txt',

Modified: zc.async/trunk/src/zc/async/threadlocal.py
===================================================================
--- zc.async/trunk/src/zc/async/threadlocal.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/threadlocal.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -55,6 +55,7 @@
 
     job = None
     dispatcher = None
+    name = None
 
     def getJob(self):
         return self.job
@@ -68,6 +69,9 @@
     def getReactor(self):
         return self.dispatcher.reactor
 
+    def getAgentName(self):
+        return self.name
+
     def setLiveAnnotation(self, name, value, job=None):
         if self.job is None or self.dispatcher.reactor is None:
             raise ValueError('not initialized')

Modified: zc.async/trunk/src/zc/async/tips.txt
===================================================================
--- zc.async/trunk/src/zc/async/tips.txt	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/tips.txt	2008-09-04 02:53:09 UTC (rev 90777)
@@ -5,6 +5,11 @@
 General Tips and Tricks
 =======================
 
+* If you have multiple machines working as zc.async dispatchers, it is
+  strongly suggested that you get the associated servers connected to a shared
+  time server.  You generally don't want your machines to disagree by more than
+  a few seconds.
+
 * Avoid long transactions if possible.  Really try to avoid long transactions
   involving frequently written objects.  One possible strategy is to divide up
   your code into a job for low-conflict tasks and one or more jobs for

Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py	2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/utils.py	2008-09-04 02:53:09 UTC (rev 90777)
@@ -26,9 +26,12 @@
 import pytz
 import zope.bforest.periodic
 
+import zc.async.interfaces
 
-EXPLOSIVE_ERRORS = (SystemExit, KeyboardInterrupt)
 
+EXPLOSIVE_ERRORS = (SystemExit, KeyboardInterrupt,
+                    zc.async.interfaces.ReassignedError)
+
 SYSTEM_ERRORS = (ZEO.Exceptions.ClientDisconnected,
                  ZODB.POSException.POSKeyError)
 
@@ -244,7 +247,6 @@
             tm.abort()
             backoff_ct += 1
             if backoff_ct == 1:
-                # import pdb; pdb.set_trace()
                 log.log(level,
                         'first error while %s; will continue in %d seconds',
                         identifier, backoff, exc_info=True)



More information about the Checkins mailing list