[Checkins] SVN: zc.async/trunk/ Mostly: prevent/handle an agent that is incorrectly identified as "dead" by a sibling.
Gary Poster
gary at modernsongs.com
Wed Sep 3 22:53:10 EDT 2008
Log message for revision 90777:
Mostly: prevent/handle an agent that is incorrectly identified as "dead" by a sibling.
- We only want to claim a job if we are activated. Make the agent check the
``activated`` and ``dead`` attributes of the parent dispatcher before
claiming.
- When activating, also clean out jobs from the dispatcher's agents, just as
with deactivating. This should protect from unusual race conditions in
which the dispatcher got a job after being deactivated.
- Change dispatcher to ping before claiming jobs.
- when a ping reactivates a dispatcher, use new method ``reactivate`` rather
than ``activate``. This fires a new ``DispatcherReactivated`` event.
- It's still theoretically possible (for instance, with a
badly-behaved long commit that causes a sibling to believe that the
process is dead) that an async worker process would be working on a
job that it shouldn't be. For instance, the job has been taken away,
and is another process' responsibility now. Now, whenever a
process is about to start any work (especially a retry), it should
double-check that the job is registered as being performed by itself.
If not, the process should abort the transaction, make an error
log, and give up on the job. Write conflict errors on the job should
protect us from the edge cases in this story. XXX test
- Make ftesting try to join worker threads, in addition to polling thread,
to try to eliminate intermittent test-runner warnings in ftests that a
thread is left behind. XXX test
Additional bits in this check-in.
- add test for parallel and serial, and fix exposed bug in serial
- beginnings of work on grok tutorial with zc.async. More coming.
- add some very basic PyPI trove metadata.
- add in z3c.recipe.tag to buildout
- copy "use NTP for multiple machines" to the "tips" section.
- add ``getAgentName`` to threadlocal object. This supports the main job for
this commit.
- move ``wait_for_start``, ``wait_for_deactivation``, and ``wait_for_death`` to
zc.async.testing from catastrophes.txt. Now also used in
parallel_serial.txt.
Changed:
_U zc.async/trunk/
U zc.async/trunk/buildout.cfg
U zc.async/trunk/setup.py
A zc.async/trunk/sphinx/QUICKSTART_2_GROK.txt
U zc.async/trunk/sphinx/index.txt
U zc.async/trunk/src/zc/async/CHANGES.txt
A zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt
U zc.async/trunk/src/zc/async/README_1.txt
U zc.async/trunk/src/zc/async/TODO.txt
U zc.async/trunk/src/zc/async/agent.py
U zc.async/trunk/src/zc/async/agent.txt
U zc.async/trunk/src/zc/async/catastrophes.txt
U zc.async/trunk/src/zc/async/dispatcher.py
U zc.async/trunk/src/zc/async/dispatcher.txt
U zc.async/trunk/src/zc/async/ftesting.py
U zc.async/trunk/src/zc/async/interfaces.py
U zc.async/trunk/src/zc/async/job.py
A zc.async/trunk/src/zc/async/parallel_serial.txt
U zc.async/trunk/src/zc/async/queue.py
U zc.async/trunk/src/zc/async/queue.txt
U zc.async/trunk/src/zc/async/testing.py
U zc.async/trunk/src/zc/async/tests.py
U zc.async/trunk/src/zc/async/threadlocal.py
U zc.async/trunk/src/zc/async/tips.txt
U zc.async/trunk/src/zc/async/utils.py
-=-
Property changes on: zc.async/trunk
___________________________________________________________________
Name: svn:ignore
- develop-eggs
bin
parts
.installed.cfg
dist
TEST_THIS_REST_BEFORE_REGISTERING.txt
*.kpf
*.bbproject
+ develop-eggs
bin
parts
.installed.cfg
dist
TEST_THIS_REST_BEFORE_REGISTERING.txt
*.kpf
*.bbproject
tags
TAGS
ID
Modified: zc.async/trunk/buildout.cfg
===================================================================
--- zc.async/trunk/buildout.cfg 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/buildout.cfg 2008-09-04 02:53:09 UTC (rev 90777)
@@ -4,6 +4,8 @@
test
z3interpreter
z3test
+ tags
+unzip = true
develop = .
@@ -34,3 +36,7 @@
recipe = zc.recipe.egg
eggs = zc.async [z3]
interpreter = z3py
+
+[tags]
+recipe = z3c.recipe.tag:tags
+eggs = zc.async
Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/setup.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -91,6 +91,7 @@
'ZODB3',
'pytz',
'rwproperty',
+ 'setuptools',
'uuid',
'zc.queue',
'zc.dict>=1.2.1',
@@ -115,5 +116,12 @@
'zope.app.component',
'simplejson',
]},
- url='http://packages.python.org/zc.async'
+ url='http://packages.python.org/zc.async',
+ classifiers=[
+ "Development Status :: 5 - Production/Stable",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: Zope Public License",
+ "Operating System :: OS Independent",
+ "Programming Language :: Python",
+ "Framework :: ZODB"],
)
Added: zc.async/trunk/sphinx/QUICKSTART_2_GROK.txt
===================================================================
--- zc.async/trunk/sphinx/QUICKSTART_2_GROK.txt (rev 0)
+++ zc.async/trunk/sphinx/QUICKSTART_2_GROK.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -0,0 +1 @@
+link ../src/zc/async/QUICKSTART_2_GROK.txt
\ No newline at end of file
Property changes on: zc.async/trunk/sphinx/QUICKSTART_2_GROK.txt
___________________________________________________________________
Name: svn:special
+ *
Modified: zc.async/trunk/sphinx/index.txt
===================================================================
--- zc.async/trunk/sphinx/index.txt 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/sphinx/index.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -93,10 +93,8 @@
:maxdepth: 1
QUICKSTART_1_VIRTUALENV
+ QUICKSTART_2_GROK
-XXX QUICKSTART_ZC_BUILDOUT
-XXX QUICKSTART_GROK
-
Documentation
-------------
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/CHANGES.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -2,7 +2,7 @@
Changes
=======
-1.4.2 (2008-??-??)
+1.5.0 (2008-??-??)
==================
- Documentation improvements. Converted documentation into Sphinx system.
@@ -18,17 +18,45 @@
- Added zc.async.partial.Partial for backward compatibility purposes.
-- Fix support for Twisted installed reactor
+- Fix support for Twisted installed reactor.
-- Fix retry behavior for parallel and serial jobs XXX NEEDS TEST
+- Fix retry behavior for parallel and serial jobs
- Tweaked the uuid.txt to mention zdaemon/supervisor rather than Zope 3.
-- Fixed some bugs in egg creation
+- Fixed some bugs in egg creation.
- Changed quotas to not use a container that has conflict resolution, since
these values should be a strict maximum.
+- We only want to claim a job if we are activated. Make the agent check the
+ ``activated`` and ``dead`` attributes of the parent dispatcher before
+ claiming.
+
+- When activating, also clean out jobs from the dispatcher's agents, just as
+ with deactivating. This should protect from unusual race conditions in
+ which the dispatcher got a job after being deactivated.
+
+- Change dispatcher to ping before claiming jobs.
+
+- when a ping reactivates a dispatcher, use new method ``reactivate`` rather
+ than ``activate``. This fires a new ``DispatcherReactivated`` event.
+
+- It's still theoretically possible (for instance, with a
+ badly-behaved long commit that causes a sibling to believe that the
+ process is dead) that an async worker process would be working on a
+ job that it shouldn't be. For instance, the job has been taken away,
+ and is another process' responsibility now. Now, whenever a
+ process is about to start any work (especially a retry), it should
+ double-check that the job is registered as being performed by itself.
+ If not, the process should abort the transaction, make an error
+ log, and give up on the job. Write conflict errors on the job should
+ protect us from the edge cases in this story. XXX test
+
+- Make ftesting try to join worker threads, in addition to polling thread,
+ to try to eliminate intermittent test-runner warnings in ftests that a
+ thread is left behind. XXX test
+
1.4.1 (2008-07-30)
==================
Added: zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt
===================================================================
--- zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt (rev 0)
+++ zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -0,0 +1,183 @@
+=====================
+Quickstart with Grok_
+=====================
+
+Goals
+=====
+
+In this quickstart, we will use zc.async to make a small web application that
+XXX
+
+XXX For simplicity, we'll assume that we are making several instances on the
+same machine, such as you might do with a few processors at your disposal. To
+get the true advantage of high availability in production, you'd want at least
+two boxes, with a deployment of a ZEO server (or equivalent, for RelStorage),
+some kind of redundancy for your database (ZRS, slony) and instructions for
+each box on how to connect to the ZEO primary.
+
+This quickstart builds on the :ref:`quickstart-with-virtualenv`. I suggest you
+read that through before this one.
+
+- That previous quickstart introduces |async| through the Python interpreter
+ for a very casual and quick start.
+
+- It also is more "pure-Python" with very little understanding needed of
+ additional frameworks to follow and use the examples.
+
+This quickstart instead uses the following somewhat "heavier" technologies.
+
+- |zc.buildout|_ is a way of producing repeatable software build-outs, for
+ development and conceivably for deployment. It is written in Python, but is
+ not Python-specific, and it has found use as a ``make`` replacement for many
+ projects.
+
+- Grok_ is a web framework emerging from "Zope 3" technologies. From their
+ website:
+
+ Grok is a web application framework for Python developers. It is aimed at
+ both beginners and very experienced web developers. Grok has an emphasis on
+ agile development. Grok is easy and powerful.
+
+This guide, then, takes a somewhat slower definition of "quick" for its
+"quickstart", in exchange for more guidance and care with a view towards
+production-readiness.
+
+.. _Grok: http://grok.zope.org/
+
+.. |async| replace:: ``zc.async``
+
+.. _`async`: http://pypi.python.org/pypi/zc.async
+
+.. |zc.buildout| replace:: ``zc.buildout``
+
+.. _`zc.buildout`: http://pypi.python.org/pypi/zc.buildout
+
+Prerequisites
+=============
+
+.. sidebar:: Building Python 2.4.5 on OS X Leopard
+
+ Unfortunately, building a clean, standalone workable Python 2.4.5 on OS X is
+ not obvious. This is what I recommend, if you are working on that platform.
+
+ First you need macports. Go to macports.org and download the newest
+ version. It doesn't seem to set up the manual path correctly, so after the
+ installation add this to your ~/.profile (or in a similar place)::
+
+ export MANPATH=/opt/local/man:$MANPATH
+
+ You'll need a new terminal session (or other shell magic if you know it) for
+ these changes to take effect. The easiest thing to do is close the shell
+ you are working in and open a new one.
+
+ Download a source distribution of Python 2.4.5. You may have your own
+ approach as to where to put things, but I'll go with this pattern in this
+ document: ~/src will hold expanded source trees, ~/opt will hold our local
+ Python, and we'll develop in ~/dev.
+
+ We will want readline and need zlib from macports.
+
+ ::
+
+ $ sudo port -c install readline
+ $ sudo port -c install zlib
+
+ Now we'll do the usual dance, with a couple of ugly extra steps.
+
+ **Note: replace ``/Users/gary/opt/py`` in the below with your own desired
+ location!**
+
+ ::
+
+ $ MACOSX_DEPLOYMENT_TARGET=10.5 ./configure \
+ --prefix=/Users/gary/opt/py \
+ LDFLAGS=-L/opt/local/lib \
+ OPT=-I/opt/local/include
+ $ make
+ $ make install
+
+ Now, given my ``--prefix``, I'll find my python in
+ ``/Users/gary/opt/py/bin/python``.
+
+Grok requires Python 2.4. Moreover, for more repeatable installations, many
+developers strongly recommend using a "clean", non-system Python, to reduce the
+probability of unnecessary or spurious problems (in your software *or* in your
+system!). Therefore, consider building your own Python 2.4 for your
+development.
+
+We'll also expect that your Python has |easy_install|_. If it doesn't, you can
+just download `ez_setup.py`_ and then run it with your local, development
+Python (e.g., ``~/opt/py/bin/python ez_setup.py``). This will install the
+easy_install command for your development Python in the same bin directory as
+your Python (e.g., ``~/opt/py/bin/easy_install``).
+
+.. |easy_install| replace:: ``easy_install``
+
+.. _`easy_install`: http://peak.telecommunity.com/DevCenter/EasyInstall
+
+.. _`ez_setup.py`: http://peak.telecommunity.com/dist/ez_setup.py
+
+grokproject_
+============
+
+.. sidebar:: |zc.buildout| Conveniences
+
+ You may want to consider the following conveniences if you are building many
+ projects with |zc.buildout|. They make zc.buildout keep two shared
+ collections across all of your zc.buildout projects. This can significantly
+ speed up the time to buildout new applications. One shared collection is a
+ download cache of source distributions and eggs. The other is an egg cache
+ only, for both the downloaded eggs and the eggs generated on your machine.
+
+ In your home directory, make a ``.buildout`` directory. In that directory,
+ make two sub-directories, ``eggs`` and ``download-cache``. Also in
+ ``.buildout``, create a file named ``default.cfg`` with the following
+ content, where ``/Users/gary`` is replaced with the path to your home
+ directory::
+
+ [buildout]
+ eggs-directory=/Users/gary/.buildout/eggs
+ download-cache=/Users/gary/.buildout/download-cache
+
+ There are many other possible settings to make here (for instance, we could
+ specify the clean Python you built here), but these are all I
+ currently bother with.
+
+ It is also worth mentioning that, as of this writing, setuptools builds eggs
+ in such a way as to confuse the Python debugger. If you use the Python
+ debugger and discover that you want to see the lines in an egg and can't,
+ the following line (or something like it) will help for non-zipped eggs::
+
+ find ~/.buildout/eggs-aside/ -name '*.pyc' -exec rm {} \;
+
+Grok has a pleasantly convenient way to start a project. It is called
+grokproject_. Use your local Python's ``easy_install`` to install it. For
+instance, I might type ``~/opt/py/bin/easy_install grokproject``.
+
+After it runs, it should have installed the ``grokproject`` command in the same
+bin directory as your local Python (e.g., ``~/opt/py/bin/grokproject``).
+
+.. _grokproject: http://pypi.python.org/pypi/grokproject
+
+Skeleton
+========
+
+Now we will use grokproject_ to make a skeleton of our package. Let's just
+call the project "quickstart". Go to a directory in which you want to develop
+our package. Then use the newly installed ``grokproject`` command to create
+
+XXX
+
+- include zc.async in setup.py; mention versions.cfg
+
+- set up ZEO
+
+- set up multiple instances
+
+- zope.app.testing = 3.4.1 -> 3.4.2
+
+- set up interpreter
+
+- set up z3monitor
+
+- make separate debug instance
Property changes on: zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: zc.async/trunk/src/zc/async/README_1.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_1.txt 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/README_1.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -970,6 +970,7 @@
'failed': False,
'poll id': ...,
'quota names': (),
+ 'reassigned': False,
'result': None,
'started': datetime.datetime(...),
'thread': ...}
@@ -1011,6 +1012,7 @@
'failed': False,
'poll id': ...,
'quota names': (),
+ 'reassigned': False,
'result': '42',
'started': datetime.datetime(...),
'thread': ...}
@@ -1164,6 +1166,14 @@
.. [#stop_usage_reactor]
+ >>> reactor.stop()
+ >>> zc.async.testing.wait_for_deactivation(dispatcher)
+ >>> for queue_pools in dispatcher.queues.values():
+ ... for name, pool in queue_pools.items():
+ ... pool.setSize(0)
+ ... for thread in pool.threads:
+ ... thread.join(3)
+ ...
>>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
{'failed': 2,
'longest active': None,
@@ -1177,4 +1187,3 @@
'statistics start': datetime.datetime(2006, 8, 10, 16, ...),
'successful': 52,
'unknown': 0}
- >>> reactor.stop()
Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/TODO.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -1,9 +1,9 @@
- fix up tips so that it looks better
-- finish quickstart
-- write a zc.buildout quickstart
-- write a grok quickstart
-- write test for using installed twisted reactor
-- write test for interrupt retry of parallel/serial
+- write a zc.buildout/grok quickstart
+- zc.z3monitor:
+ + asyncdb count pending callable:foo*
+ + asyncdb count completed agent:entry5
+ + asyncdb countall completed
Improvements
Modified: zc.async/trunk/src/zc/async/agent.py
===================================================================
--- zc.async/trunk/src/zc/async/agent.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/agent.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -69,6 +69,16 @@
return res
def claimJob(self):
+ if not self.parent.activated or self.parent.dead:
+ # we don't want to claim a job unless we are activated.
+ # Normally, this should be the case, but in unusual
+ # circumstances, such as very long commits causing the
+ # ping to not be able to commit, we might get in this
+ # unfortunate circumstance.
+ # TODO: we would like to have a read conflict error if we read
+ # activated but it changed beneath us. If the ZODB grows a gesture
+ # to cause this, use it.
+ return None
if len(self._data) < self.size:
res = self.chooser(self)
if res is not None:
Modified: zc.async/trunk/src/zc/async/agent.txt
===================================================================
--- zc.async/trunk/src/zc/async/agent.txt 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/agent.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -94,6 +94,36 @@
>>> len(agent)
3
+An agent may not claim a job if its parent is considered deactivated or dead.
+This is a safeguard for unlikely situations in which a parent dispatcher has
+been incorrectly labeled as dead because of ping times (see catastrophes.txt
+for details).
+
+ >>> da.deactivate()
+ >>> len(agent)
+ 0
+ >>> bool(da.activated)
+ False
+ >>> print agent.claimJob()
+ None
+ >>> da.activate()
+ >>> import datetime
+ >>> import pytz
+ >>> da.activated = da.last_ping.value = (datetime.datetime.now(pytz.UTC) -
+ ... datetime.timedelta(days=365))
+ >>> bool(da.activated)
+ True
+ >>> da.dead
+ True
+ >>> print agent.claimJob()
+ None
+ >>> da.deactivate()
+ >>> da.activate()
+ >>> agent.claimJob() is job5
+ True
+ >>> len(agent)
+ 1
+
This particular agent invites you to provide a function to choose jobs.
The default one simply chooses the first available job in the queue.
Modified: zc.async/trunk/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes.txt 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/catastrophes.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -513,7 +513,7 @@
>>> transaction.commit()
>>> dispatcher = zc.async.dispatcher.get()
>>> poll = zc.async.testing.get_poll(dispatcher)
- >>> wait_for_start(job)
+ >>> zc.async.testing.wait_for_start(job)
In this scenario, ``wait_for_me`` is a job that, the first time it is run, will
"unexpectedly" be lost while the dispatcher stops working. ``handle_result``
@@ -525,7 +525,7 @@
clean up its dispatcher agents, and job.handleInterrupt() goes into the queue.
>>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
- >>> wait_to_deactivate(dispatcher)
+ >>> zc.async.testing.wait_for_deactivation(dispatcher)
>>> _ = transaction.begin()
>>> job.status == zc.async.interfaces.ACTIVE
True
@@ -586,7 +586,7 @@
>>> transaction.commit()
>>> dispatcher = zc.async.dispatcher.get()
>>> poll = zc.async.testing.get_poll(dispatcher)
- >>> wait_for_start(job)
+ >>> zc.async.testing.wait_for_start(job)
Now we'll "crash" the dispatcher.
@@ -679,11 +679,10 @@
0
>>> import datetime
>>> da.ping_death_interval = datetime.timedelta(seconds=1)
- >>> da.dead
- True
+ >>> transaction.commit()
+ >>> zc.async.testing.wait_for_death(da)
>>> bool(da.activated)
True
- >>> transaction.commit()
After the next poll, the dispatcher will have cleaned up its old tasks in the
same way we saw in the previous example. The job's ``handleInterrupt`` method
@@ -747,7 +746,7 @@
>>> transaction.commit()
>>> dispatcher = zc.async.dispatcher.get()
>>> poll = zc.async.testing.get_poll(dispatcher)
- >>> wait_for_start(job)
+ >>> zc.async.testing.wait_for_start(job, seconds=30) # XXX not sure why so long
Now we'll start up an alternate dispatcher.
@@ -764,6 +763,7 @@
>>> da.ping_death_interval = datetime.timedelta(seconds=60)
>>> transaction.commit()
+ >>>
Now we'll "crash" the dispatcher.
@@ -771,6 +771,8 @@
... # cleanup
>>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash)
>>> dispatcher.thread.join(3)
+ >>> dispatcher.thread.isAlive()
+ False
As discussed in the previous example, the polling hasn't timed out yet, so the
alternate dispatcher can't know that the first one is dead. Therefore, the job
@@ -808,11 +810,10 @@
datetime.timedelta(0, 60)
>>> import datetime
>>> da.ping_death_interval = datetime.timedelta(seconds=1)
- >>> da.dead
- True
+ >>> transaction.commit()
+ >>> zc.async.testing.wait_for_death(da)
>>> bool(da.activated)
True
- >>> transaction.commit()
After the second dispatcher gets a poll--a chance to notice--it will have
cleaned up the first dispatcher's old tasks in the same way we saw in the
@@ -878,25 +879,6 @@
>>> import transaction
>>> _ = transaction.begin()
- >>> from time import sleep as time_sleep # we import in this manner so
- ... # that our testing monkey-patch of time.sleep does not affect our tests
- >>> def wait_for_start(job):
- ... for i in range(60):
- ... t = transaction.begin()
- ... if job.status == zc.async.interfaces.ACTIVE:
- ... break
- ... time_sleep(0.1)
- ... else:
- ... assert False, 'job never started'
-
- >>> def wait_to_deactivate(dispatcher):
- ... for i in range(60):
- ... if dispatcher.activated == False:
- ... break
- ... time_sleep(0.1)
- ... else:
- ... assert False, 'dispatcher never deactivated'
-
.. [#cleanup1]
>>> lock.release()
Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/dispatcher.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -61,6 +61,7 @@
def perform_thread(self):
zc.async.local.dispatcher = self.dispatcher
+ zc.async.local.name = self.name # this is the name of this pool's agent
conn = self.dispatcher.db.open()
try:
job_info = self.queue.get()
@@ -141,6 +142,11 @@
transaction.abort() # retry forever (!)
else:
break
+ except zc.async.interfaces.ReassignedError:
+ transaction.abort()
+ info['reassigned'] = True
+ # will need to get next job_info and continue
+ # EXPLOSIVE_ERRORS includes Reassigned: order is important
except zc.async.utils.EXPLOSIVE_ERRORS:
transaction.abort()
raise
@@ -165,7 +171,7 @@
info['completed'] = datetime.datetime.utcnow()
finally:
zc.async.local.job = None # also in job (here for paranoia)
- transaction.abort()
+ transaction.abort() # (also paranoia)
zc.async.utils.tracelog.info(
'completed in thread %d: %s',
info['thread'], info['call'])
@@ -229,8 +235,8 @@
thread = None # this is just a placeholder that other code can use the
# way that zc.async.subscribers.ThreadedDispatcherInstaller.__call__ does.
- def __init__(self, db, reactor=None, poll_interval=5, uuid=None, jobs_size=200,
- polls_size=400):
+ def __init__(self, db, reactor=None, poll_interval=5, uuid=None,
+ jobs_size=200, polls_size=400):
if uuid is None:
uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
if uuid in _dispatchers:
@@ -331,6 +337,10 @@
self._activated.add(queue._p_oid)
else:
continue
+ identifier = 'committing ping for UUID %s' % (self.UUID,)
+ zc.async.utils.try_five_times(
+ lambda: queue.dispatchers.ping(self.UUID), identifier,
+ transaction)
queue_info = poll_info[queue.name] = {}
pools = self.queues.get(queue.name)
if pools is None:
@@ -376,7 +386,8 @@
'call': repr(job),
'started': None,
'completed': None,
- 'thread': None}
+ 'thread': None,
+ 'reassigned': False}
started_jobs.append(info)
dbname = getattr(
job._p_jar.db(), 'database_name', None)
@@ -386,10 +397,6 @@
pool.queue.put(
(job._p_oid, dbname, info))
job = self._getJob(agent)
- identifier = 'committing ping for UUID %s' % (self.UUID,)
- zc.async.utils.try_five_times(
- lambda: queue.dispatchers.ping(self.UUID), identifier,
- transaction)
if len(pools) > len(queue_info):
conn_delta = 0
for name, pool in pools.items():
Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/dispatcher.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -295,6 +295,7 @@
'failed': False,
'poll id': ...,
'quota names': (),
+ 'reassigned': False,
'result': '42',
'started': datetime.datetime(...),
'thread': ...}
@@ -412,6 +413,40 @@
>>> wait_for_result(job4)
"reply is 'HIYA'. Locally it is MISSING."
+Sometimes, hopefully rarely-to-never, the dispatcher polls don't happen
+frequently enough. This might happen with a very long database commit, for
+instance, that locks out other changes until it is done. In this case, a
+dispatcher is incorrectly regarded as dead (see ``catastrophes.txt`` for a
+detailed discussion). The dispatcher polls before it gets jobs, which
+informs the queue that it is not actually dead. Then the agent is still
+willing to claim jobs.
+
+ >>> job5 = queue.put(zc.async.job.Job(operator.mod, 85, 43))
+ >>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
+ >>> import pytz
+ >>> da.activated = da.last_ping.value = (datetime.datetime.now(pytz.UTC) -
+ ... datetime.timedelta(days=365))
+ >>> da.dead
+ True
+ >>> da.deactivate()
+ >>> bool(da.activated)
+ False
+ >>> transaction.commit()
+ >>> ignore = get_poll(dispatcher)
+ >>> ignore = transaction.begin()
+ >>> bool(da.activated)
+ True
+ >>> evs = eventtesting.getEvents(
+ ... zc.async.interfaces.IDispatcherReactivated)
+ >>> evs # doctest: +ELLIPSIS
+ [<zc.async.interfaces.DispatcherReactivated object at ...>]
+ >>> evs[0].object._p_oid == da._p_oid
+ True
+ >>> job5.status != zc.async.interfaces.PENDING
+ True
+ >>> wait_for_result(job5)
+ 42
+
We can analyze the work the dispatcher has done. The records for this generally
only go back about ten or twelve minutes--just enough to get a feel for the
current health of the dispatcher. Use the log if you want more long-term
@@ -425,10 +460,10 @@
'shortest active': None,
'shortest failed': (..., 'unnamed'),
'shortest successful': (..., 'unnamed'),
- 'started': 8,
+ 'started': 9,
'statistics end': datetime.datetime(...),
'statistics start': datetime.datetime(...),
- 'successful': 6,
+ 'successful': 7,
'unknown': 0}
We can get a report on the reactor's status.
Modified: zc.async/trunk/src/zc/async/ftesting.py
===================================================================
--- zc.async/trunk/src/zc/async/ftesting.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/ftesting.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -49,4 +49,9 @@
del dispatcher._debug_handler
dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
dispatcher.thread.join(3)
+ for queue_pools in dispatcher.queues.values():
+ for name, pool in queue_pools.items():
+ pool.setSize(0)
+ for thread in pool.threads:
+ thread.join(3)
zc.async.dispatcher.clear()
Modified: zc.async/trunk/src/zc/async/interfaces.py
===================================================================
--- zc.async/trunk/src/zc/async/interfaces.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/interfaces.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -142,6 +142,12 @@
class DispatcherDeactivated(AbstractObjectEvent):
zope.interface.implements(IDispatcherDeactivated)
+class IDispatcherReactivated(IObjectEvent):
+ """Dispatcher was reactivated after mistaken deactivation"""
+
+class DispatcherReactivated(AbstractObjectEvent):
+ zope.interface.implements(IDispatcherReactivated)
+
class IObjectAdded(IObjectEvent):
"""Object was added to the database"""
@@ -173,6 +179,13 @@
This is almost certainly a programmer error."""
+class ReassignedError(Exception):
+ """The job has been reassigned to another process.
+
+ This should only happen when a polling timeout has made a not-dead process
+ appear to be dead to a sibling."""
+
+
class IAbstractJob(zope.interface.Interface):
parent = zope.interface.Attribute(
Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/job.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -591,15 +591,19 @@
policy, self, call_name)
return None
identifier = 'getting result for %s retry for %r' % (call_name, self)
- return zc.async.utils.never_fail(lambda: call(*args), identifier, tm)
+ res = zc.async.utils.never_fail(lambda: call(*args), identifier, tm)
+ self._check_reassigned((zc.async.interfaces.ACTIVE,)) # will raise
+ # exception if necessary
+ return res
def __call__(self, *args, **kwargs):
- if self.status not in (zc.async.interfaces.NEW,
- zc.async.interfaces.ASSIGNED):
+ statuses = (zc.async.interfaces.NEW, zc.async.interfaces.ASSIGNED)
+ if self.status not in statuses:
raise zc.async.interfaces.BadStatusError(
'can only call a job with NEW or ASSIGNED status')
tm = transaction.interfaces.ITransactionManager(self)
def prepare():
+ self._check_reassigned(statuses)
self._status_id = 1 # ACTIVE
self._active_start = datetime.datetime.now(pytz.UTC)
effective_args = list(args)
@@ -692,6 +696,7 @@
self, res)
res = failure
def complete():
+ self._check_reassigned((zc.async.interfaces.ACTIVE,))
self._result = res
self._status_id = 2 # CALLBACKS
self._active_end = datetime.datetime.now(pytz.UTC)
@@ -794,6 +799,31 @@
queue.put(self, begin_after=now+when)
return self
+ def _check_reassigned(self, expected_statuses):
+ agent = self.agent
+ res = self.status not in expected_statuses or (
+ zc.async.interfaces.IAgent.providedBy(agent) and
+ not zc.async.interfaces.IJob.providedBy(self._result) and
+ zc.async.local.getAgentName() is not None and
+ (zc.async.local.getAgentName() != agent.name or
+ zc.async.local.getDispatcher().UUID != agent.parent.UUID))
+ if res:
+ # the only known scenario for this to occur is the following.
+ # agent took job. dispatcher gave it to a thread. While
+ # performing the job, the poll was unable to write to the db,
+ # perhaps because of a database disconnect or because of a
+ # too-long commit in another process or thread. Therefore,
+ # A sibling has noticed that this agent seems to have died
+ # and put this job back in the queue, where it has been claimed
+ # by another process/agent.
+ # It's debatable whether this is CRITICAL or ERROR level. We'll
+ # go with ERROR for now.
+ zc.async.utils.log.error(
+ 'Job %r was reassigned. Likely cause was that polling was '
+ 'unable to occur as regularly as expected, perhaps because of '
+ 'long commit times in the application.', self)
+ raise zc.async.interfaces.ReassignedError()
+
def _set_result(self, res, tm, data_cache=None):
# returns whether to call ``resumeCallbacks``
callback = True
@@ -814,7 +844,8 @@
self._active_end = datetime.datetime.now(pytz.UTC)
if self._retry_policy is not None and data_cache:
self._retry_policy.updateData(data_cache)
- tm.commit()
+ tm.commit() # this should raise a ConflictError if the job has been
+ # reassigned.
return callback
def _log_completion(self, res):
@@ -921,11 +952,13 @@
if self.status != zc.async.interfaces.CALLBACKS:
raise zc.async.interfaces.BadStatusError(
'can only resumeCallbacks on a job with CALLBACKS status')
+ self._check_reassigned((zc.async.interfaces.CALLBACKS,))
callbacks = list(self.callbacks)
tm = transaction.interfaces.ITransactionManager(self)
length = 0
while 1:
for j in callbacks:
+ self._check_reassigned((zc.async.interfaces.CALLBACKS,))
if zc.async.interfaces.ICallbackProxy.providedBy(j):
j = j.getJob(self.result)
status = j.status
@@ -991,7 +1024,12 @@
queue.put(postprocess)
def _schedule_serial(*jobs, **kw):
- _queue_next(zc.async.local.getJob())
+ for ix, job in enumerate(jobs): # important for interrupts
+ if job.status == zc.async.interfaces.NEW:
+ break
+ else:
+ ix += 1
+ _queue_next(zc.async.local.getJob(), ix)
return kw['postprocess']
def serial(*jobs, **kw):
Added: zc.async/trunk/src/zc/async/parallel_serial.txt
===================================================================
--- zc.async/trunk/src/zc/async/parallel_serial.txt (rev 0)
+++ zc.async/trunk/src/zc/async/parallel_serial.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -0,0 +1,234 @@
+The ``parallel`` and ``serial`` helpers are described in README_1.txt, the
+usage document. This document is a maintainer/test document meant to show
+how ``parallel`` and ``serial`` behave when they are interrupted.
+
+The two scenarios we want to show, for each helper, are an interruption during
+a composite job, and an introduction during the postprocess.
+
+First, we set up a world and some components for the test scenario. You might
+want to try skipping to the next text block, if you are reading this through
+the first time, so you can get a broad idea of what is going on.
+
+ >>> import ZODB.FileStorage
+ >>> storage = ZODB.FileStorage.FileStorage(
+ ... 'zc_async.fs', create=True)
+ >>> from ZODB.DB import DB
+ >>> db = DB(storage)
+ >>> import zc.async.configure
+ >>> zc.async.configure.base()
+ >>> zc.async.configure.start(db, poll_interval=0.1)
+ >>> conn = db.open()
+ >>> import zc.async.interfaces
+ >>> queue = zc.async.interfaces.IQueue(conn)
+
+ >>> import zc.async.job
+ >>> import transaction
+ >>> import threading
+ >>> def normal_component():
+ ... return 25
+ >>> component_lock = threading.Lock()
+ >>> component_lock.acquire()
+ True
+ >>> component_fail_flag = True
+ >>> def pause_component():
+ ... global component_fail_flag
+ ... if component_fail_flag:
+ ... component_fail_flag = False
+ ... component_lock.acquire()
+ ... component_lock.release() # so we can use the same lock again later
+ ... raise SystemExit() # this will cause the worker thread to exit
+ ... else:
+ ... component_lock.acquire() # for the next run
+ ... component_fail_flag = True # for the next run
+ ... return 17
+ ...
+ >>> postprocess_lock = threading.Lock()
+ >>> postprocess_lock.acquire()
+ True
+ >>> postprocess_fail_flag = True
+ >>> def pause_postprocess(job1, job2):
+ ... global postprocess_fail_flag
+ ... if postprocess_fail_flag:
+ ... postprocess_fail_flag = False
+ ... postprocess_lock.acquire()
+ ... postprocess_lock.release() # so we can use the same lock again later
+ ... raise SystemExit() # this will cause the worker thread to exit
+ ... else:
+ ... postprocess_lock.acquire() # for the next run
+ ... postprocess_fail_flag = True # for the next run
+ ... return job1.result + job2.result
+ ...
+ >>> def stop_dispatcher(dispatcher, lock=None):
+ ... dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+ ... zc.async.testing.wait_for_deactivation(dispatcher)
+ ... dispatcher.thread.join(3)
+ ... if lock is not None:
+ ... lock.release()
+ ... for queue_pools in dispatcher.queues.values():
+ ... for name, pool in queue_pools.items():
+ ... pool.setSize(0)
+ ... for thread in pool.threads:
+ ... thread.join(3)
+ ...
+
+First we'll test ``serial``. We stop the worker once while it is working on
+one of the serialized jobs, and once while it is working on the postprocess.
+Both examples use two soft interrupts (i.e., with clean shut-down; see
+catastrophes.txt).
+
+We create a job with two parts, the second of which will be interrupted; and
+a postprocess, which will also be interrupted. We let it run through the first
+inner job, and start the second.
+
+ >>> job = queue.put(zc.async.job.serial(normal_component, pause_component,
+ ... postprocess=pause_postprocess))
+ >>> transaction.commit()
+ >>> import zc.async.testing
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> zc.async.testing.wait_for_start(job.args[1])
+
+We interrupt the dispatcher. Because it is a clean shut-down, we have two
+``handleInterrupt`` jobs waiting afterwards. One is for the main job
+(``schedule_serial``), and the other is for the second inner job
+(``pause_component``).
+
+ >>> stop_dispatcher(dispatcher, component_lock) # INTERRUPT 1
+
+ >>> _ = transaction.begin()
+ >>> job.status == zc.async.interfaces.ACTIVE
+ True
+ >>> job.args[1].status == zc.async.interfaces.ACTIVE
+ True
+ >>> len(queue)
+ 2
+ >>> queue[0] # doctest: +ELLIPSIS
+ <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+ >>> queue[1] # doctest: +ELLIPSIS
+ <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+ >>> queue[0].callable # doctest: +ELLIPSIS
+ <bound method Job.handleInterrupt of <...Job... ``...schedule_serial...``>>
+ >>> queue[1].callable # doctest: +ELLIPSIS
+ <bound method Job.handleInterrupt of <...Job... ``...pause_component()``>>
+
+We start up the dispatcher again (actually, it is a new one, but it is
+effectively the same in terms of its UUID).
+
+ >>> zc.async.dispatcher.clear()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db))
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> zc.async.testing.wait_for_start(job.kwargs['postprocess'])
+
+We interrupt the dispatcher again. Now the postprocessing was interrupted.
+
+ >>> stop_dispatcher(dispatcher, postprocess_lock) # INTERRUPT 2
+
+ >>> _ = transaction.begin()
+ >>> job.status == zc.async.interfaces.ACTIVE
+ True
+ >>> job.kwargs['postprocess'].status == zc.async.interfaces.ACTIVE
+ True
+ >>> len(queue)
+ 2
+ >>> queue[0] # doctest: +ELLIPSIS
+ <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+ >>> queue[1] # doctest: +ELLIPSIS
+ <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+ >>> queue[0].callable # doctest: +ELLIPSIS
+ <bound method Job.handleInterrupt of <...Job... ``...schedule_serial...``>>
+ >>> queue[1].callable # doctest: +ELLIPSIS
+ <...method Job.handleInterrupt of <...Job... ``...pause_postprocess(...``>>
+
+Finally we restart without subsequent interruptions. The job runs
+successfully to completion.
+
+ >>> zc.async.dispatcher.clear()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db))
+ >>> dispatcher = zc.async.dispatcher.get()
+
+ >>> zc.async.testing.wait_for_result(job)
+ 42
+
+Now we'll do the same thing for ``parallel``. You'll notice that this is
+virtually identical, other than a slightly different set up.
+
+We create a job with two parts, the second of which will be interrupted; and
+a postprocess, which will also be interrupted. We let it run through the first
+inner job, and start the second.
+
+ >>> job = queue.put(zc.async.job.parallel(
+ ... pause_component, normal_component, postprocess=pause_postprocess))
+ >>> transaction.commit()
+ >>> import zc.async.testing
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> zc.async.testing.wait_for_result(job.args[1])
+ 25
+ >>> zc.async.testing.wait_for_start(job.args[0])
+
+We interrupt the dispatcher. Because it is a clean shut-down, we have two
+``handleInterrupt`` jobs waiting afterwards. One is for the main job
+(``schedule_serial``), and the other is for the second inner job
+(``pause_component``).
+
+ >>> stop_dispatcher(dispatcher, component_lock) # INTERRUPT 1
+
+ >>> _ = transaction.begin()
+ >>> job.status == zc.async.interfaces.ACTIVE
+ True
+ >>> job.args[0].status == zc.async.interfaces.ACTIVE
+ True
+ >>> len(queue)
+ 2
+ >>> queue[0] # doctest: +ELLIPSIS
+ <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+ >>> queue[1] # doctest: +ELLIPSIS
+ <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+ >>> queue[0].callable # doctest: +ELLIPSIS
+ <bound method Job.handleInterrupt of <...Job... ``...schedule_parallel...``>>
+ >>> queue[1].callable # doctest: +ELLIPSIS
+ <bound method Job.handleInterrupt of <...Job... ``...pause_component()``>>
+
+We start up the dispatcher again (actually, it is a new one, but it is
+effectively the same in terms of its UUID).
+
+ >>> zc.async.dispatcher.clear()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db))
+ >>> dispatcher = zc.async.dispatcher.get()
+ >>> zc.async.testing.wait_for_start(job.kwargs['postprocess'])
+
+We interrupt the dispatcher again. Now the postprocessing was interrupted.
+
+ >>> stop_dispatcher(dispatcher, postprocess_lock) # INTERRUPT 2
+
+ >>> _ = transaction.begin()
+ >>> job.status == zc.async.interfaces.ACTIVE
+ True
+ >>> job.kwargs['postprocess'].status == zc.async.interfaces.ACTIVE
+ True
+ >>> len(queue)
+ 2
+ >>> queue[0] # doctest: +ELLIPSIS
+ <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+ >>> queue[1] # doctest: +ELLIPSIS
+ <zc.async.job.Job ... ``zc.async.job.Job ... :handleInterrupt()``>
+ >>> queue[0].callable # doctest: +ELLIPSIS
+ <bound method Job.handleInterrupt of <...Job... ``...schedule_parallel...``>>
+ >>> queue[1].callable # doctest: +ELLIPSIS
+ <...method Job.handleInterrupt of <...Job... ``...pause_postprocess(...``>>
+
+Finally we restart without subsequent interruptions. The job runs
+successfully to completion.
+
+ >>> zc.async.dispatcher.clear()
+ >>> zc.async.subscribers.ThreadedDispatcherInstaller(
+ ... poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db))
+ >>> dispatcher = zc.async.dispatcher.get()
+
+ >>> zc.async.testing.wait_for_result(job)
+ 42
+
+Shut down.
+
+ >>> stop_dispatcher(dispatcher)
Property changes on: zc.async/trunk/src/zc/async/parallel_serial.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: zc.async/trunk/src/zc/async/queue.py
===================================================================
--- zc.async/trunk/src/zc/async/queue.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/queue.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -85,14 +85,17 @@
def activate(self):
if self.activated:
raise ValueError('Already activated')
+ # in exceptional circumstances, the agents may have in-progress jobs
+ # left in them. These will never be worked on, and will block the
+ # agents from using these slots in their "size", until the jobs are
+ # removed. This can be catastrophic. Therefore we iterate over all
+ # the agents to make sure they are all clean before activating.
+ self._clean()
self.activated = datetime.datetime.now(pytz.UTC)
zope.event.notify(
zc.async.interfaces.DispatcherActivated(self))
- def deactivate(self):
- if not self.activated:
- raise ValueError('Not activated')
- self.activated = None
+ def _clean(self):
queue = self.parent
assert zc.async.interfaces.IQueue.providedBy(queue)
for agent in self.values():
@@ -141,9 +144,22 @@
job = agent.pull()
except IndexError:
job = None
+
+ def deactivate(self):
+ if not self.activated:
+ raise ValueError('Not activated')
+ self.activated = None
+ self._clean()
zope.event.notify(
zc.async.interfaces.DispatcherDeactivated(self))
+ def reactivate(self):
+ # this is called *only* by ``poll``. ``poll`` calls ``reactivate``
+ # when ``poll`` discovers that a dispatcher, thought dead, is still
+ # alive.
+ self.activated = datetime.datetime.now(pytz.UTC)
+ zope.event.notify(
+ zc.async.interfaces.DispatcherReactivated(self))
class Queues(zc.async.utils.Dict):
@@ -188,7 +204,13 @@
"if the dispatcher was inappropriately viewed as ``dead`` and "
"deactivated, you should investigate the cause.",
uuid)
- da.activate()
+ # we do this rather than calling ``activate`` because the semantics
+ # are different. ``activate`` is after a true deactivation, and
+ # cleans out the agents and fires off an activation event. This
+ # is inappropriate here, and could easily cause problems.
+ # ``reactivate`` is specifically for this circumstance: a
+ # dispatcher thought dead is discovered to be alive.
+ da.reactivate()
now = datetime.datetime.now(pytz.UTC)
last_ping = da.last_ping.value
if (last_ping is None or
Modified: zc.async/trunk/src/zc/async/queue.txt
===================================================================
--- zc.async/trunk/src/zc/async/queue.txt 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/queue.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -605,8 +605,8 @@
>>> evs[0].object is da
True
-Now a dispatcher should iterate over agents and look for jobs. There are
-not any agents at the moment.
+Now, after activation, a dispatcher should iterate over agents and look for
+jobs. There are not any agents at the moment.
>>> len(da)
0
@@ -836,18 +836,38 @@
>>> alt_agent.completed.first() is jobD
True
+Activating a dispatcher also cleans out active jobs from its agents. Doing
+it on activation is a fail-safe: normally, agents should not have jobs in
+them after deactivation.
+
+ >>> alt_agent.claimJob() is jobE
+ True
+ >>> len(alt_agent)
+ 1
+ >>> alt_da.activate()
+ >>> len(alt_agent)
+ 0
+ >>> queue[0] is jobA
+ True
+ >>> queue[1].callable == jobB.handleInterrupt
+ True
+ >>> queue[2].callable == jobC.resumeCallbacks
+ True
+ >>> queue[3] is jobE
+ True
+
As an aside, notice that the ``handleInterrupt`` and ``resumeCallbacks`` jobs
have custom error log levels, and custom retry policies.
>>> import logging
+ >>> queue[1].failure_log_level == logging.CRITICAL
+ True
>>> queue[2].failure_log_level == logging.CRITICAL
True
- >>> queue[3].failure_log_level == logging.CRITICAL
+ >>> queue[1].retry_policy_factory is zc.async.job.RetryCommonForever
True
>>> queue[2].retry_policy_factory is zc.async.job.RetryCommonForever
True
- >>> queue[3].retry_policy_factory is zc.async.job.RetryCommonForever
- True
If you have multiple workers, it is strongly suggested that you get the
associated servers connected to a shared time server.
Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/testing.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -201,6 +201,32 @@
else:
assert False, 'no poll!'
+def wait_for_start(job, seconds=6):
+ for i in range(seconds * 10):
+ t = transaction.begin()
+ if job.status == zc.async.interfaces.ACTIVE:
+ break
+ time_sleep(0.1)
+ else:
+ assert False, 'job never started (%s)' % (job.status,)
+
+def wait_for_deactivation(dispatcher, seconds=6):
+ for i in range(seconds * 10):
+ if dispatcher.activated == False:
+ break
+ time_sleep(0.1)
+ else:
+ assert False, 'dispatcher never deactivated'
+
+def wait_for_death(da, seconds=6):
+ for i in range(seconds * 10):
+ _ = transaction.begin()
+ if da.dead:
+ break
+ time_sleep(0.1)
+ else:
+ assert False, 'dispatcher agent never died'
+
def wait_for_result(job, seconds=6):
for i in range(seconds * 10):
t = transaction.begin()
Modified: zc.async/trunk/src/zc/async/tests.py
===================================================================
--- zc.async/trunk/src/zc/async/tests.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/tests.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -143,6 +143,7 @@
'agent.txt',
'dispatcher.txt',
'subscribers.txt',
+ 'parallel_serial.txt',
'twisted.txt',
'README_1.txt',
'README_2.txt',
Modified: zc.async/trunk/src/zc/async/threadlocal.py
===================================================================
--- zc.async/trunk/src/zc/async/threadlocal.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/threadlocal.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -55,6 +55,7 @@
job = None
dispatcher = None
+ name = None
def getJob(self):
return self.job
@@ -68,6 +69,9 @@
def getReactor(self):
return self.dispatcher.reactor
+ def getAgentName(self):
+ return self.name
+
def setLiveAnnotation(self, name, value, job=None):
if self.job is None or self.dispatcher.reactor is None:
raise ValueError('not initialized')
Modified: zc.async/trunk/src/zc/async/tips.txt
===================================================================
--- zc.async/trunk/src/zc/async/tips.txt 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/tips.txt 2008-09-04 02:53:09 UTC (rev 90777)
@@ -5,6 +5,11 @@
General Tips and Tricks
=======================
+* If you have multiple machines working as zc.async dispatchers, it is
+ strongly suggested that you get the associated servers connected to a shared
+ time server. You generally don't want your machines to disagree by more than
+ a few seconds.
+
* Avoid long transactions if possible. Really try to avoid long transactions
involving frequently written objects. One possible strategy is to divide up
your code into a job for low-conflict tasks and one or more jobs for
Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py 2008-09-04 00:52:16 UTC (rev 90776)
+++ zc.async/trunk/src/zc/async/utils.py 2008-09-04 02:53:09 UTC (rev 90777)
@@ -26,9 +26,12 @@
import pytz
import zope.bforest.periodic
+import zc.async.interfaces
-EXPLOSIVE_ERRORS = (SystemExit, KeyboardInterrupt)
+EXPLOSIVE_ERRORS = (SystemExit, KeyboardInterrupt,
+ zc.async.interfaces.ReassignedError)
+
SYSTEM_ERRORS = (ZEO.Exceptions.ClientDisconnected,
ZODB.POSException.POSKeyError)
@@ -244,7 +247,6 @@
tm.abort()
backoff_ct += 1
if backoff_ct == 1:
- # import pdb; pdb.set_trace()
log.log(level,
'first error while %s; will continue in %d seconds',
identifier, backoff, exc_info=True)
More information about the Checkins
mailing list