[Checkins] SVN: zc.async/branches/dev/src/zc/async/ checkpoint for work on readme; some other small changes

Gary Poster gary at zope.com
Thu Mar 20 19:59:06 EDT 2008


Log message for revision 84809:
  checkpoint for work on readme; some other small changes

Changed:
  U   zc.async/branches/dev/src/zc/async/README.txt
  A   zc.async/branches/dev/src/zc/async/README_2.txt
  U   zc.async/branches/dev/src/zc/async/agent.py
  U   zc.async/branches/dev/src/zc/async/dispatcher.py
  U   zc.async/branches/dev/src/zc/async/dispatcher.txt
  U   zc.async/branches/dev/src/zc/async/interfaces.py
  U   zc.async/branches/dev/src/zc/async/job.py
  U   zc.async/branches/dev/src/zc/async/queue.py
  U   zc.async/branches/dev/src/zc/async/queue.txt
  U   zc.async/branches/dev/src/zc/async/tests.py

-=-
Modified: zc.async/branches/dev/src/zc/async/README.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README.txt	2008-03-20 18:10:09 UTC (rev 84808)
+++ zc.async/branches/dev/src/zc/async/README.txt	2008-03-20 23:59:06 UTC (rev 84809)
@@ -1,27 +1,36 @@
-========
+~~~~~~~~
 zc.async
-========
+~~~~~~~~
 
+.. contents::
+
+============
+Introduction
+============
+
 Goals
 =====
 
-The zc.async package provides a way to make scalable asynchronous application
-calls.  Here are some example core use cases.
+The zc.async package provides a way to schedule jobs, particularly
+those working within the context of the ZODB, to be performed
+out-of-band from your current thread.  The job might be done in another
+thread or another process.  Here are some example core use cases.
 
-- You want to let users create PDFs through your application.  This can take
-  quite a bit of time, and will use both system resources and one of the
-  precious application threads until it is done.  Naively done, six or seven
-  simultaneous PDF requests could make your application unresponsive to any
-  other users.
+- You want to let users do something that requires a lot of system
+  resources from your application, such as creating a large PDF.  Naively
+  done, six or seven simultaneous PDF requests will consume your
+  application thread pool and could make your application unresponsive to
+  any other users.
 
 - You want to let users spider a web site; communicate with a credit card
   company; query a large, slow LDAP database on another machine; or do
-  some other action that generates network requests from the server. 
-  Again, if something goes wrong, several requests could make your
-  application unresponsive.
+  some other action that generates network requests from the server.
+  System resources might not be a problem, but, again, if something goes
+  wrong, several requests could make your application unresponsive.
 
 - Perhaps because of excessive conflict errors, you want to serialize work
-  that can be done asynchronously, such as cataloging data.
+  that can be done asynchronously, such as updating a single data structure
+  like a catalog index.
 
 - You want to decompose and parallelize a single job across many machines so
   it can be finished faster.
@@ -34,6 +43,9 @@
 expensive processes, on demand.  None of them are explicitly about scheduled
 tasks, though scheduled tasks can benefit from this package.
 
+Multiple processes can claim and perform jobs.  Jobs can be (manually)
+decomposed for serial or parallel processing of the component parts.
+
 History
 =======
 
@@ -45,17 +57,18 @@
 Design Overview
 ===============
 
+-----
 Usage
 -----
 
-Looking at the design from the perspective of regular usage, code
-obtains a ``queue``, which is a place to queue tasks to be performed
-asynchronously.  Code calls ``put`` on the queue to register a job.  The
-job must be a pickleable callable: a global function, a callable
-persistent object, a method of a persistent object, or a special
-zc.async.job.Job object, discussed later.  The job by default is
+Looking at the design from the perspective of regular usage, your code
+obtains a ``queue``, which is a place to queue jobs to be performed
+asynchronously.  Your application calls ``put`` on the queue to register
+a job.  The job must be a pickleable callable: a global function, a
+callable persistent object, a method of a persistent object, or a
+special zc.async.job.Job object, discussed later.  The job by default is
 regsitered to be performed as soon as possible, but can be registered to
-be called later.
+be called at a certain time.
 
 The ``put`` call will return a zc.async.job.Job object.  This
 object represents both the callable and its deferred result.  It has
@@ -63,276 +76,146 @@
 performing the job.  An example spelling for registering a job might be
 ``self.pending_result = queue.put(self.performSpider)``.  The returned
 object can be simply persisted and polled to see when the job
-is complete; or it can be set to do tasks when it completes.
+is complete; or it can be configured to do additional work when it
+completes.
 
+---------
 Mechanism
 ---------
 
-In order for this to work, components must be set up to perform the
-tasks. This part of the design has three additional kinds of participants:
-agents, dispatchers, and reactors.
+Multiple processes, typically spread across multiple machines, can use
+ZEO to connect to the queue and claim and perform work.  As with other
+collections of processes that share a database with ZEO, these processes
+generally should share the same software (though some variations on this
+constraint might be theoretically possible).
 
-A dispatcher is in charge of dispatching queued work for a given
-process.  It works with a mapping of queues and a reactor.  It has a
-universally unique identifier (UUID), which is usually an identifier of
-the application instance in which it is running.
+A process that should claim and perform work, in addition to a database
+connection and the necessary software, needs a ``dispatcher`` with a
+``reactor`` to provide a heartbeat.  The dispatcher will rely on one or more
+persistent ``agents`` in the queue (in the database) to determine which jobs
+it should perform.
 
-A reactor is something that can provide an eternal loop, or heartbeat,
+A ``dispatcher`` is in charge of dispatching queued work for a given
+process to worker threads.  It works with one or more queues and a
+single reactor.  It has a universally unique identifier (UUID), which is
+usually an identifier of the application instance in which it is
+running.  The dispatcher starts jobs in dedicated threads.
+
+A ``reactor`` is something that can provide an eternal loop, or heartbeat,
 to power the dispatcher.  It can be the main twisted reactor (in the
 main thread); another instance of a twisted reactor (in a child thread);
-or any object that implements a very small subset of the twisted reactor
-interface (see zc.async.interfaces.IReactor).
+or any object that implements a small subset of the twisted reactor
+interface (see discussion in dispatcher.txt, and example testing reactor in
+testing.py, used below).
 
-An agent is a persistent object in a queue that is associated with a
+An ``agent`` is a persistent object in a queue that is associated with a
 dispatcher and is responsible for picking jobs and keeping track of
 them. Zero or more agents within a queue can be associated with a
-dispatcher.  Each agent for a given dispatcher is identified uniquely
-with a name [#identifying_agent]_.
+dispatcher.  Each agent for a given dispatcher in a given queue is
+identified uniquely with a name [#identifying_agent]_.
 
 Generally, these work together as follows.  The reactor calls the
-dispatcher with itself and the root of the database in which the queues
-are collected. The dispatcher tries to find the mapping of queues in the
-root under a key of ``zc.async``.  If it finds the mapping, it iterates
+dispatcher. The dispatcher tries to find the mapping of queues in the
+database root under a key of ``zc.async`` (see constant
+zc.async.interfaces.KEY).  If it finds the mapping, it iterates
 over the queues (the mapping's values) and asks each queue for the
 agents associated with the dispatcher's UUID.  The dispatcher then is
 responsible for seeing what jobs its agents want to do from the queue,
 and providing threads and connections for the work to be done.  The
-dispatcher then asks the reactor to call again in a few seconds.
+dispatcher then asks the reactor to call itself again in a few seconds.
 
-Set Up
-======
+Reading More
+============
 
-Before we can make any calls, then, we need to set up a dispatcher, agent,
-reactor, and queue.
+This document continues on with four other main sections: `Usage`_,
+`Configuration Without Zope 3`_, `Configuration With Zope 3`_, and
+`Tips and Tricks`.
 
-Reactor
--------
+Other documents in the package are primarily geared as maintainer
+documentation, though the author has tried to make them readable and
+understandable.
 
-We'll use a test reactor that we can control.
+=====
+Usage
+=====
 
-    >>> import zc.async.testing
-    >>> reactor = zc.async.testing.Reactor()
-    >>> reactor.start() # this mokeypatches datetime.datetime.now 
+Overview and Basics
+===================
 
-If you look at this reactor in the testing module, you can see how small
-the necessary reactor interface is.  As mentioned above, many kinds of
-reactors can work: the main Twisted reactor, a different Twisted reactor
-instance in a child thread, or even your own reactor.  We'll have some
-quick real-word examples later (XXX).  For our purposes in controling
-our examples, this testing reactor has a special method that lets us
-call ``reactor.time_flies(seconds)`` to perform all calls that should
-happen in the next *seconds*.
+The basic usage of zc.async does not depend on a particular configuration
+of the back-end mechanism for getting the jobs done.  Moreover, on some
+teams, it will be the responsibility of one person or group to configure
+zc.async, but a service available to the code of all team members.  Therefore,
+we begin our detailed discussion with regular usage, assuming configuration
+has already happened.  Subsequent sections discuss configuring zc.async
+with and without Zope 3.
 
-Dispatcher
-----------
+So, let's assume we have a queue installed into a ZODB, with hidden
+dispatchers, reactors and agents all waiting to fulfill jobs placed into
+the queue.  We start with a connection object, ``conn``, and some
+convenience functions introduced along the way that help us simulate
+time passing and work being done[#usageSetUp]_.
 
-We need to instantiate the dispatcher with a reactor and a DB.  We have the
-reactor, so here is the DB.  We use a FileStorage rather than a
-MappingStorage variant typical in tests and examples because we want
-MVCC.
+-------------------
+Obtaining the queue
+-------------------
 
-    >>> import ZODB.FileStorage
-    >>> storage = ZODB.FileStorage.FileStorage(
-    ...     'HistoricalConnectionTests.fs', create=True)
-    >>> from ZODB.DB import DB 
-    >>> db = DB(storage) 
-    >>> conn = db.open()
-    >>> root = conn.root()
+First, how do we get the queue?  Your installation may have some
+conveniences.  For instance, the Zope 3 configuration described below
+makes it possible to get the primary queue with a call to
+``zope.component.getUtility(zc.async.interfaces.IQueue)``.
 
-The dispatcher will look for a UUID utility.
-    
-    >>> from zc.async.instanceuuid import UUID
-    >>> import zope.component
-    >>> zope.component.provideUtility(
-    ...     UUID, zc.async.interfaces.IUUID, '')
+But failing that, queues are always exected to be in a zc.async.queue.Queues
+mapping found off the ZODB root in a key defined by the constant
+zc.async.interfaces.KEY.
 
-Now we can instantiate.
-
-    >>> import zc.async.dispatcher
-    >>> dispatcher = zc.async.dispatcher.Dispatcher(reactor, db)
-
-The dispatcher knows its UUID.  This is usually a UUID of the process,
-and of the instance.  The instance UUID, in hex, is stored in
-INSTANCE_HOME/etc/uuid.txt
-
-    >>> import uuid
-    >>> import os
-    >>> f = open(os.path.join(
-    ...     os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt'))
-    >>> uuid_hex = f.readline().strip()
-    >>> f.close()
-    >>> uuid = uuid.UUID(uuid_hex)
-    >>> dispatcher.UUID == uuid
-    True
-
-(The uuid.txt file is intended to stay in the instance home as a
-persistent identifier.)
-
-If you don't want this default UUID, you can pass one in to the constructor.
-
-The dispatcher also has a configuration value, ``poll_interval``.  This
-value indicates how often in seconds, approximately, the dispatcher
-should poll queues for work.  It defaults to 5 seconds.
-
-    >>> dispatcher.poll_interval
-    5
-
-Now we'll activate the dispatcher and let it poll.
-
-    >>> dispatcher.activate()
-    >>> reactor.time_flies(1)
-    1
-
-Note that the dispatcher didn't complain even though the ``zc.async``
-key does not exist in the root.
-
-The dispatcher has tried to poll once, to no effect.
-
-    >>> import datetime
-    >>> import pytz
-    >>> len(dispatcher.polls)
-    1
-    >>> poll = dispatcher.polls.first()
-    >>> poll.utc_timestamp <= datetime.datetime.utcnow()
-    True
-    >>> poll
-    {}
-
-The ``dispatcher.polls.first()`` poll always contains information about
-the dispatcher's most recent poll, if any.  The ``utc_timestamp`` is the
-UTC timestamp of the (end of the) last poll. The ``polls`` object is a
-(non-persistent) data structure documenting approximately the last 24 hours
-of polls.  View and change this with the ``period`` value of the polls
-object.
-
-    >>> dispatcher.polls.period
-    datetime.timedelta(1)
-
-Each poll is represented with a mapping, with keys of queue keys in
-the root queue mapping.  The values are mappings of agents that were
-polled, where the key is the agent name and the value is a data object
-describing what was done for the agent.
-
-Queue
------
-
-Now let's create the mapping of queues, and a single queue.
-
-    >>> import zc.async.queue
     >>> import zc.async.interfaces
-    >>> mapping = root[zc.async.interfaces.KEY] = zc.async.queue.Queues()
-    >>> queue = mapping[''] = zc.async.queue.Queue()
-    >>> import transaction
-    >>> transaction.commit()
-
-Now we have created everything except an agent for this dispatcher in the
-queue.  If we let the reactor run, the queue will be checked, but nothing
-will have been done, because no agents were found.
-
-    >>> reactor.time_flies(dispatcher.poll_interval)
-    1
-    >>> len(dispatcher.polls)
-    2
-    >>> import pprint
-    >>> pprint.pprint(dispatcher.polls.first())
-    {'': {}}
-
-Well, actually, a bit more than nothing was done.
-
-- The dispatcher registered and activated itself with the queue.
-
-- The queue fired events to announce the dispatcher's registration and
-  activation.  We could have registered subscribers for either or both
-  of these events to create agents.
-  
-  Note that the dispatcher in queue.dispatchers is a persistent
-  representative of the actual dispatcher: they are different objects.
-
-- Lastly, the dispatcher made its first ping.  A ping means that the
-  dispatcher changes a datetime to record that it is alive.  
-
-  The dispatcher needs to update its last_ping after every ``ping_interval``
-  seconds.  If it has not updated the last_ping after ``ping_death_interval``
-  then the dispatcher is considered to be dead, and active jobs in the
-  dispatcher's agents are ended (and given a chance to respond to that status
-  change, so they can put themselves back on the queue to be restarted if
-  desired).
-
-These are demonstrated in dispatcher.txt.
-
-Agent
------
-
-As mentioned above, we could have registered a subscriber for either or
-both of the registration and activation events to create agents.  For
-this example, though, we'll add an agent manually.
-
-Agents are responsible for getting the next job from the queue and for
-specifying how many worker threads they should use at once.  We'll use
-the defaults for now to create an agent that simply gets the next
-available FIFO job, and has a maximum of three worker threads.
-
-    >>> import zc.async.agent
-    >>> agent = zc.async.agent.Agent()
-    >>> queue.dispatchers[dispatcher.UUID]['main'] = agent
-    >>> agent.chooser is zc.async.agent.chooseFirst
+    >>> zc.async.interfaces.KEY
+    'zc.async'
+    >>> root = conn.root()
+    >>> queues = root[zc.async.interfaces.KEY]
+    >>> import zc.async.queue
+    >>> isinstance(queues, zc.async.queue.Queues)
     True
-    >>> agent.size
-    3
-    >>> transaction.commit()
 
-Now if we poll, the agent will be included, thought there are still no jobs
-to be done.
+As the name implies, ``queues`` is a collection of queues.  It's
+possible to have multiple queues, as a tool to distribute and control
+work.  We will assume a convention of a queue being available in the ''
+(empty string).  This is followed in the Zope 3 configuration discussed
+below.
 
-    >>> reactor.time_flies(dispatcher.poll_interval)
-    1
-    >>> len(dispatcher.polls)
-    3
-    >>> dispatcher.polls.first()
-    {'': {'main': {'new_jobs': [], 'error': None, 'len': 0, 'size': 3}}}
+    >>> queues.keys()
+    ['']
+    >>> queue = queues['']
 
-It took awhile to explain it, but we now have a simple set up: a queue,
-a dispatcher with reactor, and an agent.  Let's start doing the easy and
-fun part: making some asynchronous calls!
+---------
+queue.put
+---------
 
-Basic Usage: IQueue.put
-=========================
+Now we want to actually get some work done.  The simplest case is simple
+to perform: pass a persistable callable to the queue's ``put`` method and
+commit the transaction.
 
-The simplest case is simple to perform: pass a persistable callable to the
-queue's `put` method.  We'll need some adapters to make this happen
-[#setup_adapters]_.
-
     >>> def send_message():
     ...     print "imagine this sent a message to another machine"
     >>> job = queue.put(send_message)
+    >>> import transaction
     >>> transaction.commit()
 
-Now we need to wait for the poll to happen again, and then wait for the job
-to be completed in a worker thread.
+The ``put`` returned a job.  Now we need to wait for the job to be
+performed.  We would normally do this by really waiting.  For our
+examples, we will use a helper function called ``wait_for`` to wait for
+the job to be completed [#wait_for]_.
 
-    >>> import time
-    >>> def wait_for(*jobs, **kwargs):
-    ...     reactor.time_flies(dispatcher.poll_interval) # starts thread
-    ...     # now we wait for the thread
-    ...     for i in range(kwargs.get('attempts', 10)):
-    ...         while reactor.time_passes():
-    ...             pass
-    ...         transaction.begin()
-    ...         for j in jobs:
-    ...             if j.status != zc.async.interfaces.COMPLETED:
-    ...                 break
-    ...         else:
-    ...             break
-    ...         time.sleep(0.1)
-    ...     else:
-    ...         print 'TIME OUT'
-    ...
     >>> wait_for(job)
     imagine this sent a message to another machine
 
 We also could have used the method of a persistent object.  Here's another
 quick example.
 
+First we define a simple persistent.Persistent subclass and put it in the
+database[#commit_for_multidatabase]_.
+
     >>> import persistent
     >>> class Demo(persistent.Persistent):
     ...     counter = 0
@@ -341,16 +224,28 @@
     ...
     >>> root['demo'] = Demo()
     >>> transaction.commit()
+
+Now we can put the ``demo.increase`` method in the queue.
+
     >>> root['demo'].counter
     0
     >>> job = queue.put(root['demo'].increase)
     >>> transaction.commit()
+
     >>> wait_for(job)
     >>> root['demo'].counter
     1
 
 The method was called, and the persistent object modified!
 
+To reiterate, only persistent callables and the methods of persistent
+objects can be used.  This rules out, for instance, closures.  As we'll
+see below, the job instance can help us out there.
+
+---------------
+Scheduled Calls
+---------------
+
 You can also pass a datetime.datetime to schedule a call.  A datetime
 without a timezone is considered to be in the UTC timezone.
 
@@ -358,49 +253,32 @@
     >>> import datetime
     >>> import pytz
     >>> datetime.datetime.now(pytz.UTC)
-    datetime.datetime(2006, 8, 10, 15, 44, 43, 211, tzinfo=<UTC>)
+    datetime.datetime(2006, 8, 10, 15, 44, 33, 211, tzinfo=<UTC>)
     >>> job = queue.put(
-    ...     send_message, datetime.datetime(
+    ...     send_message, begin_after=datetime.datetime(
     ...         2006, 8, 10, 15, 56, tzinfo=pytz.UTC))
     >>> job.begin_after
     datetime.datetime(2006, 8, 10, 15, 56, tzinfo=<UTC>)
     >>> transaction.commit()
-    >>> wait_for(job, attempts=1) # +5 virtual seconds
+    >>> wait_for(job, attempts=2) # +5 virtual seconds
     TIME OUT
-    >>> wait_for(job, attempts=1) # +5 virtual seconds
+    >>> wait_for(job, attempts=2) # +5 virtual seconds
     TIME OUT
+    >>> datetime.datetime.now(pytz.UTC)
+    datetime.datetime(2006, 8, 10, 15, 44, 43, 211, tzinfo=<UTC>)
+
     >>> zc.async.testing.set_now(datetime.datetime(
     ...     2006, 8, 10, 15, 56, tzinfo=pytz.UTC))
-    >>> wait_for(job) # +5 virtual seconds
+    >>> wait_for(job)
     imagine this sent a message to another machine
     >>> datetime.datetime.now(pytz.UTC) >= job.begin_after
     True
 
 If you set a time that has already passed, it will be run as if it had
-been set to run immediately.
+been set to run as soon as possible[#already_passed]_...unless the job
+has already timed out, in which case the job fails with an
+abort[#already_passed_timed_out]_.
 
-    >>> t = transaction.begin()
-    >>> job = queue.put(
-    ...     send_message, datetime.datetime(2006, 8, 10, 15, tzinfo=pytz.UTC))
-    >>> transaction.commit()
-    >>> wait_for(job)
-    imagine this sent a message to another machine
-
-...unless the job has already timed out, in which case the job fails
-with an abort.
-
-    >>> t = transaction.begin()
-    >>> job = queue.put(
-    ...     send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
-    >>> transaction.commit()
-    >>> wait_for(job)
-    >>> job.result
-    <twisted.python.failure.Failure zc.async.interfaces.AbortedError>
-    >>> import sys
-    >>> job.result.printTraceback(sys.stdout) # doctest: +NORMALIZE_WHITESPACE
-    Traceback (most recent call last):
-    Failure: zc.async.interfaces.AbortedError:
-
 The queue's `put` method is the essential API.  Other methods are used
 to introspect, but are not needed for basic usage.
 
@@ -410,6 +288,10 @@
 Jobs
 ====
 
+--------
+Overview
+--------
+
 The result of a call to `put` returns an IJob.  The
 job represents the pending result.  This object has a lot of
 functionality that's explored in other documents in this package, and
@@ -421,9 +303,6 @@
 - You can specify that the job should be run serially with others
   of a given identifier.
 
-- You can specify that the job may or may not be run by given
-  workers (identifying them by their UUID).
-
 - You can specify other calls that should be made on the basis of the
   result of this call.
 
@@ -438,6 +317,10 @@
   way to safely communicate exceptions across connections and machines
   and processes.
 
+-------
+Results
+-------
+
 So here's a simple story.  What if you want to get a result back from a
 call?  Look at the job.result after the call is COMPLETED.
 
@@ -458,13 +341,17 @@
     >>> job.status == zc.async.interfaces.COMPLETED
     True
 
-What's more, you can pass a Job to the `put` call.  This means that
-you aren't constrained to simply having simple non-argument calls
-performed asynchronously, but you can pass a job with a call,
-arguments, and keyword arguments.  Here's a quick example.  We'll use
-the demo object, and its increase method, that we introduced above, but
-this time we'll include some arguments [#job]_.
+--------
+Closures
+--------
 
+What's more, you can pass a Job to the `put` call.  This means that you
+aren't constrained to simply having simple non-argument calls performed
+asynchronously, but you can pass a job with a call, arguments, and
+keyword arguments--effectively, a kind of closure.  Here's a quick example. 
+We'll use the demo object, and its increase method, that we introduced
+above, but this time we'll include some arguments [#job]_.
+
 With placeful arguments:
 
     >>> t = transaction.begin()
@@ -488,6 +375,10 @@
 
 Note that arguments to these jobs can be any persistable object.
 
+--------
+Failures
+--------
+
 What happens if a call raises an exception?  The return value is a Failure.
 
     >>> def I_am_a_bad_bad_function():
@@ -509,45 +400,108 @@
     exceptions.NameError: global name 'foo' is not defined
     <BLANKLINE>
 
+---------
+Callbacks
+---------
+
+You can register callbacks to handle the result of a job, whether a
+Failure or another result.  These callbacks can be thought of as the
+"except" "else" or "finally" clauses of a "try" statement.  Each
+callback receives the job's current result as input, and its output
+becomes the job's new result (and therefore the input of the next
+callback, if any).
+
+Note that, during execution of a callback, there is no guarantee that
+the callback will be processed on the same machine as the main call.  Also,
+the ``local`` functions will not work.
+
+Here's a simple example of reacting to a success.
+
+    >>> def I_scribble_on_strings(string):
+    ...     return string + ": SCRIBBLED"
+    ...
+    >>> job = queue.put(imaginaryNetworkCall)
+    >>> callback = job.addCallback(I_scribble_on_strings)
+    >>> transaction.commit()
+    >>> wait_for(job)
+    >>> job.result
+    '200 OK'
+    >>> callback.result
+    '200 OK: SCRIBBLED'
+
+Here's a more complex example of handling a Failure, and then chaining
+a subsequent callback.
+
+    >>> def I_handle_NameErrors(failure):
+    ...     failure.trap(NameError) # see twisted.python.failure.Failure docs
+    ...     return 'I handled a name error'
+    ...
+    >>> job = queue.put(I_am_a_bad_bad_function)
+    >>> callback1 = job.addCallbacks(failure=I_handle_NameErrors)
+    >>> callback2 = callback1.addCallback(I_scribble_on_strings)
+    >>> transaction.commit()
+    >>> wait_for(job)
+    >>> job.result
+    <twisted.python.failure.Failure exceptions.NameError>
+    >>> callback1.result
+    'I handled a name error'
+    >>> callback2.result
+    'I handled a name error: SCRIBBLED'
+
 zc.async.local
 ==============
 
 Jobs always run their callables in a thread, within the context of a
-connection to the ZODB. The callables have access to three special
+connection to the ZODB. The callables have access to five special
 thread-local functions if they need them for special uses.  These are
 available off of zc.async.local.
 
-zc.async.local.getJob()
-    The getJob function can be used to examine the job, to get
-    a connection off of _p_jar, to get the queue into which the job
+``zc.async.local.getJob()``
+    The ``getJob`` function can be used to examine the job, to get
+    a connection off of ``_p_jar``, to get the queue into which the job
     was put, or other uses.
 
-zc.async.local.setLiveAnnotation(name, value[, job])
-    The setLiveAnnotation tells the agent to set an annotation on a job,
+``zc.async.local.setLiveAnnotation(name, value, job=None)``
+    The ``setLiveAnnotation`` tells the agent to set an annotation on a job,
     by default the current job, *in another connection*.  This makes it
     possible to send messages about progress or for coordination while in the
-    middle of other work.  *For safety, do not send mutables or a persistent
-    object.*
+    middle of other work.
+    
+    As a simple rule, only send immutable objects like strings or
+    numbers as values[#setLiveAnnotation]_.
 
-zc.async.local.getLiveAnnotation(name[, job[, default=None[, block=False]]])
-    The getLiveAnnotation tells the agent to get an annotation for a job,
+``zc.async.local.getLiveAnnotation(name, default=None, timeout=0, poll=1, job=None)``
+    The ``getLiveAnnotation`` tells the agent to get an annotation for a job,
     by default the current job, *from another connection*.  This makes it
     possible to send messages about progress or for coordination while in the
-    middle of other work.  *For safety, if you get a mutable, do not mutate
-    it.*  If the value is a persistent object, it will not be returned: you
-    will get a Fault object instead.  If the ``block`` argument is True,
-    the function will wait until an annotation of the given name is available.
-    Otherwise, it will return the ``default`` if the name is not present in the
-    annotations.
+    middle of other work.  
+    
+    As a simple rule, only ask for annotation values that will be
+    immutable objects like strings or numbers[#getLiveAnnotation]_.
 
-The last two functions can even be passed to a thread that does not have
-a connection.  Note that this is not intended as a way to communicate across
-threads on the same process, but across processes.
+    If the ``timeout`` argument is set to a positive float or int, the
+    function will wait that at least that number of seconds until an
+    annotation of the given name is available. Otherwise, it will return
+    the ``default`` if the name is not present in the annotations.   The
+    ``poll`` argument specifies approximately how often to poll for the
+    annotation, in seconds, though as the timeout period approaches the
+    next poll will be min(poll, remaining seconds to timeout).
 
-Let's give these a whirl.  We will write a function that examines the
-job's state while it is being called, and sets the state in an
-annotation, then waits for our flag to finish.
+``zc.async.local.getReactor()``
+    The ``getReactor`` function returns the job's dispatcher's reactor.  The
+    ``getLiveAnnotation`` and ``setLiveAnnotation`` functions use this,
+    along with the zc.twist package, to work their magic; if you are feeling
+    adventurous, you can do the same.
 
+``zc.async.local.getDispatcher()``
+    The ``getDispatcher`` function returns the job's dispatcher.  This might
+    be used to analyze its non-persistent poll data structure, for instance
+    (described later in configuration discussions).
+
+Let's give the first three a whirl.  We will write a function that
+examines the job's state while it is being called, and sets the state in
+an annotation, then waits for our flag to finish.
+
     >>> def annotateStatus():
     ...     zc.async.local.setLiveAnnotation(
     ...         'zc.async.test.status',
@@ -582,56 +536,12 @@
     >>> job.result
     42
 
-Job Callbacks
-=============
+``getReactor`` and ``getDispatcher`` are for advanced use cases and are not
+explored further here.
 
-You can register callbacks to handle the result of a job, whether a
-Failure or another result.  These callbacks can be thought of as the
-"except" "else" or "finally" clauses of a "try" statement.  Each
-callback receives the job's current result as input, and its output
-becomes the job's new result (and therefore the input of the next
-callback, if any).
+Job Quotas
+==========
 
-Note that, during execution of a callback, there is no guarantee that
-the callback will be processed on the same machine as the main call.  Also,
-the ``local`` functions will not work.
-
-Here's a simple example of reacting to a success.
-
-    >>> def I_scribble_on_strings(string):
-    ...     return string + ": SCRIBBLED"
-    ...
-    >>> job = queue.put(imaginaryNetworkCall)
-    >>> callback = job.addCallback(I_scribble_on_strings)
-    >>> transaction.commit()
-    >>> wait_for(job)
-    >>> job.result
-    '200 OK'
-    >>> callback.result
-    '200 OK: SCRIBBLED'
-
-Here's a more complex example of handling a Failure, and then chaining
-a subsequent callback.
-
-    >>> def I_handle_NameErrors(failure):
-    ...     failure.trap(NameError) # see twisted.python.failure.Failure docs
-    ...     return 'I handled a name error'
-    ...
-    >>> job = queue.put(I_am_a_bad_bad_function)
-    >>> callback1 = job.addCallbacks(failure=I_handle_NameErrors)
-    >>> callback2 = callback1.addCallback(I_scribble_on_strings)
-    >>> transaction.commit()
-    >>> wait_for(job)
-    >>> job.result
-    <twisted.python.failure.Failure exceptions.NameError>
-    >>> callback1.result
-    'I handled a name error'
-    >>> callback2.result
-    'I handled a name error: SCRIBBLED'
-
-Enforced Serialization
-======================
-
 One class of asynchronous jobs are ideally serialized.  For instance,
 you may want to reduce or eliminate the chance of conflict errors when
 updating a text index.  One way to do this kind of serialization is to
@@ -639,6 +549,7 @@
 
 For example, let's first show two non-serialized jobs running at the
 same time, and then two serialized jobs created at the same time.
+The first part of the example does not use queue_names, to show a contrast.
 
 For our parallel jobs, we'll do something that would create a deadlock
 if they were serial.  Notice that we are mutating the job arguments after
@@ -648,7 +559,7 @@
     ...     zc.async.local.setLiveAnnotation(
     ...         'zc.async.test.flag', True)
     ...     zc.async.local.getLiveAnnotation(
-    ...         'zc.async.test.flag', other, block=True)
+    ...         'zc.async.test.flag', job=other, timeout=0.4, poll=0)
     ...
     >>> job1 = queue.put(waitForParallel)
     >>> job2 = queue.put(waitForParallel)
@@ -660,10 +571,16 @@
     True
     >>> job2.status == zc.async.interfaces.COMPLETED
     True
+    >>> job1.result is job2.result is None
+    True
 
 On the other hand, for our serial jobs, we'll do something that would fail
-if it were parallel.
+if it were parallel.  We'll rely on ``quota_names``.  
 
+Quotas verge on configuration, which is not what this section is about,
+because they must be configured on the queue.  However, they also affect
+usage, so we show them here.
+
     >>> def pause(other):
     ...     zc.async.local.setLiveAnnotation(
     ...         'zc.async.test.flag', True)
@@ -672,15 +589,21 @@
     ...
     >>> job1 = queue.put(pause)
     >>> job2 = queue.put(imaginaryNetworkCall)
+
+You can't put a name in ``quota_names`` unless the quota has been created
+in the queue.
+
     >>> job1.quota_names = ('test',)
     Traceback (most recent call last):
     ...
-    ValueError: quota name not defined in queue
+    ValueError: ('unknown quota name', 'test')
     >>> queue.quotas.create('test')
     >>> job1.quota_names = ('test',)
     >>> job2.quota_names = ('test',)
+
+Now we can see the two jobs being performed serially.
+
     >>> job1.args.append(job2)
-    >>> job2.args.append(job1)
     >>> transaction.commit()
     >>> reactor.time_flies(dispatcher.poll_interval)
     1
@@ -698,7 +621,15 @@
     >>> transaction.commit()
     >>> wait_for(job1)
     >>> wait_for(job2)
+    >>> print job1.result
+    None
+    >>> print job2.result
+    200 OK
 
+Quotas can be configured for limits greater than one at a time, if desired.
+This may be valuable when a needed resource is only available in limited
+numbers at a time.
+
 Returning Jobs
 ==============
 
@@ -710,6 +641,7 @@
 other jobs; and to make parts of a job that can be parallelized available
 to more workers.
 
+---------------
 Serialized Work
 ---------------
 
@@ -743,6 +675,7 @@
 separation of code: dividing code that does work from code that
 orchestrates the jobs.  We'll see an example of the idea below.
 
+-----------------
 Parallelized Work
 -----------------
 
@@ -848,64 +781,16 @@
     >>> job.result
     '200 OK'
 
-Logging Agents
-==============
+Conclusion
+==========
 
-Agents log when they get a job and when they complete a job.  They also can
-log every given number of polls.
+This concludes our discussion of zc.async usage.  The next section shows
+how to configure zc.async without Zope 3[#stop_reactor]_.
 
-Additional Agents
-=================
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
 
-A process can host many different agents, and many processes can provide
-workers for a queue.
-
-Handling Failed Agents
-======================
-
-...worker finds another process already installed with same UUID and
-name; could be shutdown error (ghost of self) or really another
-process...show engineUUID... some discussion already in
-datamanager.txt...
-
-Advice
-======
-
-Avoid Conflict Errors
----------------------
-
-...try to only mutate job, or contemplate serializing...
-
-Gotchas
-=======
-
-...some callbacks may still be working when job is completed.  Therefore
-job put in `completed` for worker so that it can have a chance to run to
-completion.
-
-    >>> reactor.stop()
-
-=========
-Footnotes
-=========
-
-.. [#other_packages] Another Zope 3 package that approaches somewhat
-    similar use cases to these is lovely.remotetask
-    (http://svn.zope.org/lovely.remotetask/).
-    
-    Another set of use cases center around scheduling: we need to retry an
-    asynchronous task after a given amount of time; or we want to have a
-    requested job happen late at night; or we want to have a job happen
-    regularly.  The Zope 3 scheduler
-    (http://svn.zope.org/Zope3/trunk/src/scheduler/) approaches the last of
-    these tasks with more infrastructure than zc.async, as arguably does a
-    typical "cron wget" approach.  However, both approaches are prone to
-    serious problems when the scheduled task takes more time than expected,
-    and one instance of a task overlaps the previous one, sometimes causing
-    disastrous problems.  By using zc.async jobs to represent the
-    pending result, and even to schedule the next call, this problem can be
-    alleviated.
-
 .. [#history] The first generation, zasync, had the following goals:
 
     - be scalable, so that another process or machine could do the
@@ -936,8 +821,8 @@
 
         The zasync design has three main components, as divided by their
         roles: persistent deferreds, now called jobs; job queues (the
-        original zasync's "asynchronous call manager"); and asynchronous
-        workers (the original zasync ZEO client).  The zasync 1.x design
+        original zasync's "asynchronous call manager"); and dispatchers
+        (the original zasync ZEO client).  The zasync 1.x design
         blurred the lines between the three components such that the
         component parts could only be replaced with difficulty, if at
         all. A goal for the 2.x design is to clearly define the role for
@@ -1035,83 +920,11 @@
         Retries and other use cases make time-delayed deferred calls
         desirable. The new design supports these sort of calls.
 
-.. [#identifying_agent] Generally, the combination of a queue name plus a
+.. [#identifying_agent] The combination of a queue name plus a
     dispatcher UUID plus an agent name uniquely identifies an agent.
 
-.. [#uuid] UUIDs are generated by http://zesty.ca/python/uuid.html, as
-    incorporated in Python 2.5.  They are expected to be found in 
-    os.path.join(os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt');
-    this file will be created and populated with a new UUID if it does
-    not exist.
+.. [#usageSetUp] We set up the configuration for our usage examples here.
 
-.. [#subscribers] The zc.async.subscribers module provides two different
-    subscribers to set up a datamanager.  One subscriber expects to put
-    the object in the same database as the main application
-    (`zc.async.subscribers.basicInstallerAndNotifier`).  This is the
-    default, and should probably be used if you are a casual user.
-    
-    The other subscriber expects to put the object in a secondary
-    database, with a reference to it in the main database
-    (`zc.async.subscribers.installerAndNotifier`).  This approach keeps
-    the database churn generated by zc.async, which can be significant,
-    separate from your main data.  However, it also requires that you
-    set up two databases in your zope.conf (or equivalent, if this is
-    used outside of Zope 3).  And possibly even more onerously, it means
-    that persistent objects used for calls must either already be
-    committed, or be explicitly added to a connection; otherwise you
-    will get an InvalidObjectReference (see
-    cross-database-references.txt in the ZODB package).  The possible
-    annoyances may be worth it to someone building a more demanding
-    application.
-    
-    Again, the first subscriber is the easier to use, and is the default.
-    You can use either one (or your own).
-
-    If you do want to use the second subscriber, here's a start on what
-    you might need to do in your zope.conf.  In a Zope without ZEO you
-    would set something like this up.
-
-    <zodb>
-      <filestorage>
-        path $DATADIR/Data.fs
-      </filestorage>
-    </zodb>
-    <zodb zc.async>
-      <filestorage>
-        path $DATADIR/zc.async.fs
-      </filestorage>
-    </zodb>
-
-    For ZEO, you could have the two databases on one server...
-    
-    <filestorage 1>
-      path Data.fs
-    </filestorage>
-    <filestorage 2>
-      path zc.async.fs
-    </filestorage>
-    
-    ...and then set up ZEO clients something like this.
-    
-    <zodb>
-      <zeoclient>
-        server localhost:8100
-        storage 1
-        # ZEO client cache, in bytes
-        cache-size 20MB
-      </zeoclient>
-    </zodb>
-    <zodb zc.async>
-      <zeoclient>
-        server localhost:8100
-        storage 2
-        # ZEO client cache, in bytes
-        cache-size 20MB
-      </zeoclient>
-    </zodb>
-
-.. [#setup_adapters]
-
     You must have two adapter registrations: IConnection to
     ITransactionManager, and IPersistent to IConnection.  We will also
     register IPersistent to ITransactionManager because the adapter is
@@ -1143,15 +956,142 @@
     ...     provides=zc.async.interfaces.IJob)
     ...
 
-.. [#handlers] In the second footnote above, the text describes two
-    available subscribers.  When this documentation is run as a test, it
-    is run twice, once with each.  To accomodate this, in our example
-    below we appear to pull the "installerAndNotifier" out of the air:
-    it is installed as a global when the test is run.
+    We'll use a test reactor that we can control.
 
+    >>> import zc.async.testing
+    >>> reactor = zc.async.testing.Reactor()
+    >>> reactor.start() # this mokeypatches datetime.datetime.now 
+
+    We need to instantiate the dispatcher with a reactor and a DB.  We
+    have the reactor, so here is the DB.  We use a FileStorage rather
+    than a MappingStorage variant typical in tests and examples because
+    we want MVCC.
+
+    >>> import ZODB.FileStorage
+    >>> storage = ZODB.FileStorage.FileStorage(
+    ...     'zc_async.fs', create=True)
+    >>> from ZODB.DB import DB 
+    >>> db = DB(storage) 
+    >>> conn = db.open()
+    >>> root = conn.root()
+
+    Now let's create the mapping of queues, and a single queue.
+
+    >>> import zc.async.queue
+    >>> import zc.async.interfaces
+    >>> mapping = root[zc.async.interfaces.KEY] = zc.async.queue.Queues()
+    >>> queue = mapping[''] = zc.async.queue.Queue()
+    >>> import transaction
+    >>> transaction.commit()
+
+    The dispatcher will look for a UUID utility.
+    
+    >>> from zc.async.instanceuuid import UUID
+    >>> import zope.component
+    >>> zope.component.provideUtility(
+    ...     UUID, zc.async.interfaces.IUUID, '')
+
+    Now we can instantiate, activate, and perform some reactor work in order
+    to let the dispatcher register with the queue.
+
+    >>> import zc.async.dispatcher
+    >>> dispatcher = zc.async.dispatcher.Dispatcher(db, reactor)
+    >>> dispatcher.UUID == UUID
+    True
+    >>> dispatcher.activate()
+    >>> reactor.time_flies(1)
+    1
+
+    Here's an agent named 'main'
+
+    >>> import zc.async.agent
+    >>> agent = zc.async.agent.Agent()
+    >>> queue.dispatchers[dispatcher.UUID]['main'] = agent
+    >>> agent.chooser is zc.async.agent.chooseFirst
+    True
+    >>> agent.size
+    3
+    >>> transaction.commit()
+
+.. [#wait_for] This is our helper function.  It relies on the test fixtures
+    set up in the previous footnote.
+
+    >>> import time
+    >>> def wait_for(*jobs, **kwargs):
+    ...     reactor.time_flies(dispatcher.poll_interval) # starts thread
+    ...     # now we wait for the thread
+    ...     for i in range(kwargs.get('attempts', 10)):
+    ...         while reactor.time_passes():
+    ...             pass
+    ...         transaction.begin()
+    ...         for j in jobs:
+    ...             if j.status != zc.async.interfaces.COMPLETED:
+    ...                 break
+    ...         else:
+    ...             break
+    ...         time.sleep(0.1)
+    ...     else:
+    ...         print 'TIME OUT'
+    ...
+
+.. [#commit_for_multidatabase] We commit before we do the next step as a
+    good practice, in case the queue is from a different database than
+    the root.  See the `Tips and Tricks`_ section for a discussion about
+    why putting the queue in another database might be a good idea. 
+    
+    Rather than committing the transaction,
+    ``root._p_jar.add(root['demo'])`` would also accomplish the same
+    thing from a multi-database perspective, without a commit.  It was
+    not used in the example because the ``transaction.commit()`` the author
+    judged it to be less jarring to the reader.  If you are down here
+    reading this footnote, maybe the author was wrong. :-)
+
+.. [#already_passed]
+
+    >>> t = transaction.begin()
+    >>> job = queue.put(
+    ...     send_message, datetime.datetime(2006, 8, 10, 15, tzinfo=pytz.UTC))
+    >>> transaction.commit()
+    >>> wait_for(job)
+    imagine this sent a message to another machine
+    
+    It's worth noting that this situation consitutes a small exception
+    in the handling of scheduled calls.  Scheduled calls usually get
+    preference when jobs are handed out over normal non-scheduled "as soon as
+    possible" jobs.  However, setting the begin_after date to an earlier
+    time puts the job at the end of the (usually) FIFO queue of non-scheduled
+    tasks: it is treated exactly as if the date had not been specified.
+
+.. [#already_passed_timed_out]
+
+    >>> t = transaction.begin()
+    >>> job = queue.put(
+    ...     send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
+    >>> transaction.commit()
+    >>> wait_for(job)
+    >>> job.result
+    <twisted.python.failure.Failure zc.async.interfaces.AbortedError>
+    >>> import sys
+    >>> job.result.printTraceback(sys.stdout) # doctest: +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    Failure: zc.async.interfaces.AbortedError:
+
 .. [#job] The Job class can take arguments and keyword arguments
     for the wrapped callable at call time as well, similar to Python
     2.5's `partial`.  This will be important when we use the Job as
     a callback.  For this use case, though, realize that the job
     will be called with no arguments, so you must supply all necessary
     arguments for the callable on creation time.
+
+.. [#setLiveAnnotation]  Here's the real rule, which is more complex.
+    *Do not send non-persistent mutables or a persistent.Persistent
+    object without a connection, unless you do not refer to it again in
+    the current job.*
+
+.. [#getLiveAnnotation] Here's the real rule. *To prevent surprising
+    errors, do not request an annotation that might be a persistent
+    object.*
+
+.. [#stop_reactor] 
+
+    >>> reactor.stop()

Added: zc.async/branches/dev/src/zc/async/README_2.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README_2.txt	                        (rev 0)
+++ zc.async/branches/dev/src/zc/async/README_2.txt	2008-03-20 23:59:06 UTC (rev 84809)
@@ -0,0 +1,634 @@
+============================
+Configuration Without Zope 3
+============================
+
+This section discusses setting up zc.async without Zope 3.  Since Zope 3
+is ill-defined, we will be more specific: this describes setting up
+zc.async without ZCML, without any zope.app packages, and with as few
+dependencies as possible.  A casual way of describing the dependencies
+is "ZODB and zope.component"[#specific_dependencies]_.
+
+The next section, `Configuration With Zope 3`_, still tries to limit
+dependencies, but includes both ZCML and indirect and direct
+dependencies on such packages as zope.publisher and zope.app.appsetup.
+
+Configuration has three basic parts: component registrations, ZODB
+setup, and ZODB configuration.
+
+Component Registrations
+=======================
+
+Some registrations are required, and some are optional.  Since they are
+component registrations, even for the required registrations, other
+implementations are possible.
+
+--------
+Required
+--------
+
+You must have three adapter registrations: IConnection to
+ITransactionManager, IPersistent to IConnection, and IPersistent to
+ITransactionManager.
+
+The ``zc.twist`` package provides all of these adapters.  However,
+zope.app.keyreference also provides a version of the ``connection`` adapter
+that is identical or very similar, and that should work fine if you are 
+already using that package in your application.
+
+    >>> from zc.twist import transactionManager, connection
+    >>> import zope.component
+    >>> zope.component.provideAdapter(transactionManager)
+    >>> zope.component.provideAdapter(connection)
+    >>> import ZODB.interfaces
+    >>> zope.component.provideAdapter(
+    ...     transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+We also need to be able to adapt functions and methods to jobs.  The
+zc.async.job.Job class is the expected implementation.
+
+    >>> import types
+    >>> import zc.async.interfaces
+    >>> import zc.async.job
+    >>> zope.component.provideAdapter(
+    ...     zc.async.job.Job,
+    ...     adapts=(types.FunctionType,),
+    ...     provides=zc.async.interfaces.IJob)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.job.Job,
+    ...     adapts=(types.MethodType,),
+    ...     provides=zc.async.interfaces.IJob)
+    ...
+
+--------
+Optional
+--------
+
+UUID
+----
+
+The dispatcher will look for a UUID utility if a UUID is not specifically
+provided to its constructor.
+    
+    >>> from zc.async.instanceuuid import UUID
+    >>> zope.component.provideUtility(
+    ...     UUID, zc.async.interfaces.IUUID, '')
+
+The UUID we register here is a UUID of the instance, which is expected
+to uniquely identify the process when in production. It is stored in
+INSTANCE_HOME/etc/uuid.txt.
+
+    >>> import uuid
+    >>> import os
+    >>> f = open(os.path.join(
+    ...     os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt'))
+    >>> uuid_hex = f.readline().strip()
+    >>> f.close()
+    >>> uuid = uuid.UUID(uuid_hex)
+    >>> UUID == uuid
+    True
+
+The uuid.txt file is intended to stay in the instance home as a
+persistent identifier.
+
+Queue Adapter
+-------------
+
+You may want to set up an adapter from persistent objects to a named queue.
+The zc.async.queue.getDefaultQueue adapter is a reasonable approach.
+
+    >>> import zc.async.queue
+    >>> zope.component.provideAdapter(zc.async.queue.getDefaultQueue)
+
+This returns the queue names '' (empty string).
+
+Agent Subscribers
+-----------------
+
+As we'll see below, the dispatcher fires an event when it registers with
+a queue, and another when it activates the queue.  These events give you
+the opportunity to register subscribers to add one or more agents to a
+queue, to tell the dispatcher what jobs to perform.
+zc.async.agent.addMainAgentActivationHandler is a reasonable starter: it
+adds a single agent named 'main' if one does not exist.  The agent has a
+simple indiscriminate FIFO policy for the queue.  If you want to write
+your own subscriber, look at this.
+
+Agents are an important part of the ZODB configuration, and so are described
+more in depth below.
+
+    >>> import zc.async.agent
+    >>> zope.component.provideHandler(
+    ...     zc.async.agent.addMainAgentActivationHandler)
+
+This subscriber is registered for the IDispatcherActivated event; another
+approach might use the IDispatcherRegistered event.
+
+Database Startup Subscribers
+----------------------------
+
+Typically you will want to start the reactor, if necessary, and instantiate
+and activate the dispatcher when the database is ready.  Depending on your
+application, this can be done in-line with your start up code, or with a
+subscriber to some event.
+
+Zope 3 provides an event, zope.app.appsetup.interfaces.IDatabaseOpenedEvent,
+that the Zope 3 configuration uses.  You may also want to follow this kind
+of pattern.
+
+For our example, we will start the dispatcher in-line (see the beginning of
+the `ZODB Configuration`_ section).
+
+ZODB Setup
+==========
+
+--------------------
+Storage and DB Setup
+--------------------
+
+On a basic level, zc.async needs a setup that supports good conflict
+resolution.  Most or all production ZODB storages now have the necessary
+APIs to support MVCC.  You should also make sure that your ZEO server
+has all the code that includes conflict resolution, such as zc.queue.
+
+A more subtle decision is whether to use multiple databases.  The zc.async
+dispatcher can generate a lot of database churn.  It may be wise to put the
+queue in a separate database from your content database(s).  
+
+The downsides to this option include the fact that you must be careful to
+specify to which database objects belong; and that broken cross-database
+references are not handled gracefully in the ZODB as of this writing.
+
+We will use multiple databases for our example here.  See the footnote in
+the usage section that sets up the tests for a non-multiple database
+approach.
+
+(We use a FileStorage rather than a MappingStorage variant typical in
+tests and examples because we want MVCC, as mentioned above.)
+
+    >>> databases = {}
+    >>> import ZODB.FileStorage
+    >>> storage = ZODB.FileStorage.FileStorage(
+    ...     'main.fs', create=True)
+    
+    >>> async_storage = ZODB.FileStorage.FileStorage(
+    ...     'async.fs', create=True)
+
+    >>> from ZODB.DB import DB 
+    >>> databases[''] = db = DB(storage)
+    >>> databases['async'] = async_db = DB(async_storage)
+    >>> async_db.databases = db.databases = databases
+    >>> db.database_name = ''
+    >>> async_db.database_name = 'async'
+    >>> conn = db.open()
+    >>> root = conn.root()
+
+---------
+DB layout
+---------
+
+Dispatchers look for queues in a mapping off the root of the database in 
+a key defined as a constant: zc.async.interfaces.KEY.  This mapping should
+generally be a zc.async.queue.Queues object.
+
+If we were not using a multi-database for our example, we could simply install
+the queues mapping with this line:
+``root[zc.async.interfaces.KEY] = zc.async.queue.Queues()``.  We will need
+something a bit more baroque.  We will add the queues mapping to the 'async'
+database, and then make it available in the main database ('') with the proper
+key.
+
+    >>> conn2 = conn.get_connection('async')
+    >>> queues = conn2.root()['mounted_queues'] = zc.async.queue.Queues()
+
+Note that the 'mounted_queues' key in the async database is arbitrary:
+what we care about is the key in the database that the dispatcher will
+see.
+
+Now we add the object explicitly to conn2, so that the ZODB will know the
+"real" database in which the object lives, even though it will be also
+accessible from the main database.
+
+    >>> conn2.add(queues)
+    >>> root[zc.async.interfaces.KEY] = queues
+    >>> import transaction
+    >>> transaction.commit()
+
+Now we need to put a queue in the queues collection.  We can have more than
+one, as discussed below, but we suggest a convention of the primary queue
+being available in a key of '' (empty string).
+
+    >>> queue = queues[''] = zc.async.queue.Queue()
+    >>> transaction.commit()
+
+We can now get the queue with the optional adapter from IPersistent to IQueue
+above.
+
+    >>> queue is zc.async.interfaces.IQueue(root)
+    True
+
+ZODB Configuration
+==================
+
+Now we can start the reactor, and start the dispatcher.  As noted above,
+in some applications this may be done with an event subscriber.  We will
+do it inline.
+
+Any object that conforms to the specification of zc.async.interfaces.IReactor
+will be usable by the dispatcher.  For our example, we will use our own instance
+of the Twisted select-based reactor running in a separate thread.  This is
+separate from the Twisted reator installed in twisted.internet.reactor, and
+so this approach can be used with an application that does not otherwise use
+Twisted (for instance, a Zope application using the "classic" zope publisher).
+
+The testing module also has a reactor on which the `Usage` section relies.
+
+Configuring the basics is fairly simple, as we'll see in a moment.  The
+trickiest part is to handle signals cleanly.  Here we install signal
+handlers in the main thread using ``reactor._handleSignals``.  This may
+work in some real-world applications, but if your application already
+needs to handle signals you may need a more careful approach.  The Zope
+3 configuration has some options you can explore.  
+
+    >>> import twisted.internet.selectreactor
+    >>> reactor = twisted.internet.selectreactor.SelectReactor()
+    >>> reactor._handleSignals()
+
+Now we are ready to instantiate our dispatcher.
+
+    >>> dispatcher = zc.async.dispatcher.Dispatcher(db, reactor)
+
+Notice it has the uuid defined in instanceuuid.
+
+    >>> dispatcher.UUID == UUID
+    True
+
+Now we can start the reactor and the dispatcher in a thread.
+
+    >>> import threading
+    >>> def start():
+    ...     dispatcher.activate()
+    ...     reactor.run(installSignalHandlers=0)
+    ...
+    >>> thread = threading.Thread(target=start)
+    >>> thread.setDaemon(True)
+
+    >>> thread.start()
+
+The dispatcher should be starting up now.  Let's wait for it to activate.
+We're using a test convenience, get_poll, defined in the footnotes
+[#get_poll]_.
+
+    >>> poll = get_poll(0)
+
+We're off!  The events have been fired for registering and activating the
+dispatcher.  Therefore, our subscriber to add our agent has fired.
+
+We need to begin our transaction to synchronize our view of the database.
+
+    >>> t = transaction.begin()
+
+We get the collection of dispatcher agents from the queue, using the UUID.
+
+    >>> dispatcher_agents = queue.dispatchers[UUID]
+
+It has one agent--the one placed by our subscriber.
+
+    >>> dispatcher_agents.keys()
+    ['main']
+    >>> agent = dispatcher_agents['main']
+
+Now we have our agent!  But...what is it[#stop_reactor]_?
+
+------
+Agents
+------
+
+Agents are the way you control what a dispatcher's worker threads do.  They
+pick the jobs and assign them to their dispatcher when the dispatcher asks.
+
+*If a dispatcher does not have any agents in a give queue, it will not perform
+any tasks for that queue.*
+
+We currently have an agent that simply asks for the next available FIFO job.
+We are using an agent implementation that allows you to specify a callable to
+choose the job.  That callable is now zc.async.agent.chooseFirst.
+
+    >>> agent.chooser is zc.async.agent.chooseFirst
+    True
+
+Here's the entire implementation of that function::
+
+    def chooseFirst(agent):
+        return agent.queue.claim()
+
+What would another agent do?  Well, it might pass a filter function to
+``claim``.  This function takes a job and returns a value evaluated as a
+boolean.  For instance, let's say we always wanted a certain number of
+threads available for working on a particular call; for the purpose of
+example, we'll use ``operator.mul``, though a more real-world example
+might be a network call or a particular call in your application.
+
+    >>> import operator
+    >>> def chooseMul(agent):
+    ...     return agent.queue.claim(lambda job: job.callable is operator.mul)
+    ...
+
+Another variant would prefer operator.mul, but if one is not in the queue,
+it will take any.
+
+    >>> def preferMul(agent):
+    ...     res = agent.queue.claim(lambda job: job.callable is operator.mul)
+    ...     if res is None:
+    ...         res = agent.queue.claim()
+    ...     return res
+    ...
+
+Other approaches might look at the current jobs in the agent, or the agent's
+dispatcher, and decide what jobs to prefer on that basis.  The agent should
+support many ideas.
+
+Let's set up another agent, in addition to the ``chooseFirst`` one, that has
+the ``preferMul`` policy.
+
+    >>> agent2 = dispatcher_agents['mul'] = zc.async.agent.Agent(preferMul)
+
+Another characteristic of agents is that they specify how many jobs they
+should pick at a time.  The dispatcher actually adjusts the size of the
+ZODB connection pool to accommodate its agents' size.  The default is 3.
+
+    >>> agent.size
+    3
+    >>> agent2.size
+    3
+
+We'll manipulate that a little later.
+
+Finally, it's worth noting that agents contain the jobs that are currently
+be worked on by the dispatcher, on their behalf; and have a ``completed``
+collection of the more recent completed jobs, beginnin with the most recently
+completed job.
+
+---------------
+Multiple Queues
+---------------
+
+Since we put our queues in a mapping of them, we can also create multiple
+queues.  This can make some scenarios more convenient and simpler to reason
+about.  For instance, while you might have agents filtering jobs as we
+describe above, it might be simpler to say that you have a queue for one kind
+of job--say, processing a video file or an audio file--and a queue for other
+kinds of jobs.  Then it is easy and obvious to set up simple FIFO agents
+as desired for different dispatchers.  The same kind of logic could be
+accomplished with agents, but it is easier to picture the multiple queues.
+
+------
+Quotas
+------
+
+We touched on quotas in the usage section.  Some jobs will need to
+access resoources that are shared across processes.  A central data
+structure such as an index in the ZODB is a prime example, but other
+examples might include a network service that only allows a certain
+number of concurrent connections.  These scenarios can be helped by
+quotas.
+
+Quotas are demonstrated in the usage section.  For configuration, you
+should know these characteristics:
+
+- you cannot add a job with a quota name that is not defined in the queue;
+
+    >>> import operator
+    >>> import zc.async.job
+    >>> job = zc.async.job.Job(operator.mul, 5, 2)
+    >>> job.quota_names = ['content catalog']
+    >>> job.quota_names
+    ('content catalog',)
+    >>> queue.put(job)
+    Traceback (most recent call last):
+    ...
+    ValueError: ('unknown quota name', 'content catalog')
+    >>> len(queue)
+    0
+
+- you cannot add a quota name to a job in a queue if the quota name is not
+  defined in the queue;
+
+    >>> job.quota_names = ()
+    >>> job is queue.put(job)
+    True
+    >>> job.quota_names = ('content catalog',)
+    Traceback (most recent call last):
+    ...
+    ValueError: ('unknown quota name', 'content catalog')
+    >>> job.quota_names
+    ()
+
+- you can create and remove quotas on the queue;
+
+    >>> list(queue.quotas)
+    []
+    >>> queue.quotas.create('testing')
+    >>> list(queue.quotas)
+    ['testing']
+    >>> queue.quotas.remove('testing')
+    >>> list(queue.quotas)
+    []
+
+- you can remove quotas if pending jobs have their quota names--the quota name
+  is then ignored;
+
+    >>> queue.quotas.create('content catalog')
+    >>> job.quota_names = ('content catalog',)
+    >>> queue.quotas.remove('content catalog')
+    >>> job.quota_names
+    ('content catalog',)
+    >>> job is queue.claim()
+    True
+    >>> len(queue)
+    0
+
+- quotas default to a size of 1;
+
+    >>> queue.quotas.create('content catalog')
+    >>> queue.quotas['content catalog'].size
+    1
+
+- this can be changed at creation or later; and
+
+    >>> queue.quotas['content catalog'].size = 2
+    >>> queue.quotas['content catalog'].size
+    2
+    >>> queue.quotas.create('frobnitz account', size=3)
+    >>> queue.quotas['frobnitz account'].size
+    3
+
+- decreasing the size of a quota while the old quota size is filled will
+  not affect the currently running jobs.
+
+    >>> job1 = zc.async.job.Job(operator.mul, 5, 2)
+    >>> job2 = zc.async.job.Job(operator.mul, 5, 2)
+    >>> job3 = zc.async.job.Job(operator.mul, 5, 2)
+    >>> job1.quota_names = job2.quota_names = job3.quota_names = (
+    ...     'content catalog',)
+    >>> job1 is queue.put(job1)
+    True
+    >>> job2 is queue.put(job2)
+    True
+    >>> job3 is queue.put(job3)
+    True
+    >>> job1 is queue.claim()
+    True
+    >>> job2 is queue.claim()
+    True
+    >>> print queue.claim()
+    None
+    >>> quota = queue.quotas['content catalog']
+    >>> len(quota)
+    2
+    >>> list(quota) == [job1, job2]
+    True
+    >>> quota.filled
+    True
+    >>> quota.size = 1
+    >>> quota.filled
+    True
+    >>> print queue.claim()
+    None
+    >>> job1()
+    10
+    >>> print queue.claim()
+    None
+    >>> len(quota)
+    1
+    >>> list(quota) == [job2]
+    True
+    >>> job2()
+    10
+    >>> job3 is queue.claim()
+    True
+    >>> list(quota) == [job3]
+    True
+    >>> len(quota)
+    1
+    >>> job3()
+    10
+    >>> print queue.claim()
+    None
+    >>> len(queue)
+    0
+    >>> quota.clean()
+    >>> len(quota)
+    0
+    >>> quota.filled
+    False
+
+Additional Topics: Logging and Monitoring
+=========================================
+
+XXX see monitor.txt for sketch of zc.z3monitor monitoring.
+
+    >>> reactor.stop()
+
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
+
+.. [#specific_dependencies]  More specifically, as of this writing,
+    these are the minimal egg dependencies (including indirect
+    dependencies):
+
+    - pytz
+        A Python time zone library
+    
+    - rwproperty
+        A small package of desriptor conveniences
+    
+    - uuid
+        The uuid module included in Python 2.5
+    
+    - zc.dict
+        A ZODB-aware dict implementation based on BTrees.
+    
+    - zc.queue
+        A ZODB-aware queue
+    
+    - zc.twist
+        Conveniences for working with Twisted and the ZODB
+    
+    - zc.twisted
+        A setuptools-friendly Twisted distribution, hopefully to be replaced
+        with a normal Twisted distribution when it is ready.
+    
+    - ZConfig
+        A general configuration package coming from the Zope project with which
+        the ZODB tests.
+    
+    - zdaemon
+        A general daemon tool coming from the Zope project.
+    
+    - ZODB3
+        The Zope Object Database.
+    
+    - zope.bforest
+        Aggregations of multiple BTrees into a single dict-like structure,
+        reasonable for rotating data structures, among other purposes.
+    
+    - zope.component
+        A way to hook together code by contract.
+    
+    - zope.deferredimport
+        A way to defer imports in Python packages, often to prevent circular
+        import problems.
+    
+    - zope.deprecation
+        A small framework for deprecating features.
+    
+    - zope.event
+        An exceedingly small event framework that derives its power from
+        zope.component.
+    
+    - zope.i18nmessageid
+        A way to specify strings to be translated.
+    
+    - zope.interface
+        A way to specify code contracts and other data structures.
+    
+    - zope.proxy
+        A way to proxy other Python objects.
+    
+    - zope.testing
+        Testing extensions and helpers.
+
+.. [#get_poll]
+
+    >>> import time
+    >>> def get_poll(count = None):
+    ...     if count is None:
+    ...         count = len(dispatcher.polls)
+    ...     for i in range(30):
+    ...         if len(dispatcher.polls) > count:
+    ...             return dispatcher.polls.first()
+    ...         time.sleep(0.1)
+    ...     else:
+    ...         assert False, 'no poll!'
+    ... 
+
+.. [#stop_reactor] We don't want the live dispatcher for our demos, actually.
+    See dispatcher.txt to see the live dispatcher actually in use.
+
+    >>> reactor.callFromThread(reactor.stop)
+    >>> for i in range(30):
+    ...     if not dispatcher.activated:
+    ...         break
+    ...     time.sleep(0.1)
+    ... else:
+    ...     assert False, 'dispatcher did not deactivate'
+    ...
+
+    Now, we'll restart with an explicit reactor.
+    
+    >>> import zc.async.testing
+    >>> reactor = zc.async.testing.Reactor()
+    >>> dispatcher.reactor = reactor
+    >>> dispatcher.activate()
+    >>> reactor.start()


Property changes on: zc.async/branches/dev/src/zc/async/README_2.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: zc.async/branches/dev/src/zc/async/agent.py
===================================================================
--- zc.async/branches/dev/src/zc/async/agent.py	2008-03-20 18:10:09 UTC (rev 84808)
+++ zc.async/branches/dev/src/zc/async/agent.py	2008-03-20 23:59:06 UTC (rev 84809)
@@ -2,6 +2,7 @@
 import datetime
 
 import zope.interface
+import zope.component
 
 import zc.async.interfaces
 import zc.async.utils
@@ -15,8 +16,7 @@
 
     zope.interface.implements(zc.async.interfaces.IAgent)
 
-    def __init__(self, name='', chooser=chooseFirst, size=3):
-        self.name = name
+    def __init__(self, chooser=chooseFirst, size=3):
         self.chooser = chooser
         self.size = size
         self._data = zc.queue.PersistentQueue()
@@ -61,3 +61,9 @@
     def jobCompleted(self, job):
         self.remove(job)
         self.completed.add(job)
+
+ at zope.component.adapter(zc.async.interfaces.IDispatcherActivated)
+def addMainAgentActivationHandler(event):
+    da = event.object
+    if 'main' not in da:
+        da['main'] = Agent()

Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py	2008-03-20 18:10:09 UTC (rev 84808)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py	2008-03-20 23:59:06 UTC (rev 84809)
@@ -16,18 +16,19 @@
 import zc.async.interfaces
 
 def _get(reactor, job, name, default, timeout, poll, deferred, start=None):
+    now = time.time()
     if start is None:
-        start = time.time()
+        start = now
     if name in job.annotations:
         res = job.annotations[name]
-    elif start + timeout < time.time():
+    elif start + timeout < now:
         res = default
     else:
         partial = zc.twist.Partial(
             _get, reactor, job, name, default, timeout, poll, deferred,
             start)
         partial.setReactor(reactor)
-        reactor.callLater(poll, partial)
+        reactor.callLater(min(poll, start + timeout - now), partial)
         return
     deferred.setResult(res)
 
@@ -65,7 +66,7 @@
         if job is None:
             job = self.job
         partial = zc.twist.Partial(
-            job.annotations.__setitem__, name, value) # XXX NO
+            job.annotations.__setitem__, name, value)
         partial.setReactor(self.dispatcher.reactor)
         self.dispatcher.reactor.callFromThread(partial)
 
@@ -176,9 +177,10 @@
 
     activated = False
 
-    def __init__(self, reactor, db, poll_interval=5, uuid=None):
-        self.reactor = reactor
+    def __init__(self, db, reactor, poll_interval=5, uuid=None):
         self.db = db
+        self.reactor = reactor # we may allow the ``reactor`` argument to be
+        # None at some point, to default to the installed Twisted reactor.
         self.poll_interval = poll_interval
         if uuid is None:
             uuid = zope.component.getUtility(zc.async.interfaces.IUUID)

Modified: zc.async/branches/dev/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.txt	2008-03-20 18:10:09 UTC (rev 84808)
+++ zc.async/branches/dev/src/zc/async/dispatcher.txt	2008-03-20 23:59:06 UTC (rev 84809)
@@ -62,7 +62,7 @@
 
     >>> import zc.async.dispatcher
     >>> dispatcher = zc.async.dispatcher.Dispatcher(
-    ...     reactor, db, poll_interval=0.5)
+    ...     db, reactor, poll_interval=0.5)
     >>> dispatcher.UUID is UUID
     True
     >>> dispatcher.reactor is reactor
@@ -198,7 +198,7 @@
     11
 
 We can actually get it to perform some jobs now.  Here's a silly simple
-one.  We use a test convenience , wait_for_result, defined in the footnotes
+one.  We use a test convenience, wait_for_result, defined in the footnotes
 [#wait_for_result]_.
 
     >>> import operator

Modified: zc.async/branches/dev/src/zc/async/interfaces.py
===================================================================
--- zc.async/branches/dev/src/zc/async/interfaces.py	2008-03-20 18:10:09 UTC (rev 84808)
+++ zc.async/branches/dev/src/zc/async/interfaces.py	2008-03-20 23:59:06 UTC (rev 84809)
@@ -5,6 +5,9 @@
 import zc.queue.interfaces
 from zc.async.i18n import _
 
+# TODO: these interfaces are not particularly complete.  The other
+# documentation is more accurate at the moment.
+
 KEY = 'zc.async'
 
 NEW = _('new-status', 'New')
@@ -14,7 +17,41 @@
 CALLBACKS = _('callback-status', 'Performing Callbacks')
 COMPLETED = _('completed-status', 'Completed')
 
+class IReactor(zope.interface.Interface):
+    """This describes what the dispatcher expects of the reactor.
+    
+    The reactor does not need to actually provide this interface."""
+    
+    def callFromThread(callable, *args, **kw):
+        """have callable run in reactor's thread, by reactor, ASAP.
+        
+        Intended to be called from a thread other than the reactor's main
+        loop.
+        """
+    
+    def callInThread(callable, *args, **kw):
+        """have callable run in a separate thread, ASAP.
+        
+        Must be called in same thread as reactor's main loop.
+        """
+    
+    def callLater(seconds, callable, *args, **kw):
+        """have callable run in reactor at least <seconds> from now
+        
+        Must be called in same thread as reactor's main loop.
+        """
 
+    def addSystemEventTrigger(phase, event, callable, *args, **kw):
+        """Install a callable to be run in phase of event.
+        
+        must support phase 'before', and event 'shutdown'.
+        """
+
+    def callWhenRunning(self, _callable, *args, **kw):
+        """run callable now if running, or when started.
+        """
+
+
 class IObjectEvent(zope.interface.Interface):
     """Event happened to object"""
     
@@ -134,7 +171,34 @@
         be untouched.  May only be called when job is in CALLBACKS state.
         State will be COMPLETED after this call."""
 
+    assignerUUID = zope.interface.Attribute(
+        """The UUID of the software instance that was in charge when the
+        IJob was put in an IJobQueue.  Should be assigned by
+        IJobQueue.put.""")
 
+#     selectedUUIDs = zope.interface.Attribute(
+#         """a set of selected worker UUIDs.  If it is empty, it is
+#         interpreted as the set of all available workerUUIDs.  Only
+#         workers with UUIDs in the set may perform it.
+# 
+#         If a worker would have selected this job for a run, but the
+#         difference of selected_workerUUIDs and excluded_workerUUIDs
+#         stopped it, it is responsible for verifying that the effective
+#         set of workerUUIDs intersects with the available workers; if the
+#         intersection contains no possible workers, the worker should
+#         call job.fail().""")
+
+    begin_after = zope.interface.Attribute(
+        """A datetime.datetime in UTC of the first time when the
+        job may run.  Cannot be set after job gets a data_manager.
+        """)
+
+    begin_by = zope.interface.Attribute(
+        """A datetime.timedelta of the duration after the begin_after
+        value after which the job will fail, if it has not already
+        begun.  Cannot be set after job has begun.""")
+
+
 class IAgent(zope.interface.common.sequence.IFiniteSequence):
     """Responsible for picking jobs and keeping track of them.
     
@@ -180,43 +244,6 @@
     def index(item):
         """return index, or raise ValueError if item is not in queue"""
 
-
-class IQueuedJob(IJob):
-    """An async job with all the necessary knobs to by put in a
-    datamanager."""
-
-    workerUUID = zope.interface.Attribute(
-        """The UUID of the IWorker who is, or was, responsible for this
-        job.  None initially.  Should be assigned by
-        IWorker.[reactor|thread].put.""")
-
-    assignerUUID = zope.interface.Attribute(
-        """The UUID of the software instance that was in charge when the
-        IJob was put in an IJobQueue.  Should be assigned by
-        IJobQueue.put.""")
-
-    selectedUUIDs = zope.interface.Attribute(
-        """a set of selected worker UUIDs.  If it is empty, it is
-        interpreted as the set of all available workerUUIDs.  Only
-        workers with UUIDs in the set may perform it.
-
-        If a worker would have selected this job for a run, but the
-        difference of selected_workerUUIDs and excluded_workerUUIDs
-        stopped it, it is responsible for verifying that the effective
-        set of workerUUIDs intersects with the available workers; if the
-        intersection contains no possible workers, the worker should
-        call job.fail().""")
-
-    begin_after = zope.interface.Attribute(
-        """A datetime.datetime in UTC of the first time when the
-        job may run.  Cannot be set after job gets a data_manager.
-        """)
-
-    begin_by = zope.interface.Attribute(
-        """A datetime.timedelta of the duration after the begin_after
-        value after which the job will fail, if it has not already
-        begun.  Cannot be set after job has begun.""")
-
 class IQueue(zc.queue.interfaces.IQueue):
 
     parent = zope.interface.Attribute(
@@ -286,34 +313,6 @@
     parent = zope.interface.Attribute(
         "")
 
-
-class IDataManager(zope.interface.Interface):
-    """Note that jobs added to queues are not guaranteed to run in
-    the order added.  For sequencing, use IJob.addCallbacks."""
-
-    thread = zope.interface.Attribute(
-        """An IJobQueue of IJobs that should be run in a thread.""")
-
-    reactor = zope.interface.Attribute(
-        """An IJobQueue of IJobs that should be run in the main
-        loop (e.g., Twisted's main reactor loop).""")
-
-    workers = zope.interface.Attribute(
-        """An IWorkers of registered IWorker objects for this data manager;
-        these objects represent processes that are responsible for claiming
-        and performing the IJobs in the data manager.""")
-
-    def checkSibling(uuid):
-        """check the next sibling of uuid to see if it is dead, according
-        to its last_poll, and remove the engineUUID and schedule removal of its
-        jobs if it is dead."""
-
-class IDataManagerAvailableEvent(zope.component.interfaces.IObjectEvent):
-    """object is data manager"""
-
-class DataManagerAvailable(zope.component.interfaces.ObjectEvent):
-    zope.interface.implements(IDataManagerAvailableEvent)
-
 class FullError(Exception):
     """Container is full.
     """
@@ -369,47 +368,6 @@
     def __nonzero__():
         "whether collection contains any jobs"
 
-class IWorker(zope.interface.Interface):
-
-    reactor = zope.interface.Attribute(
-        """An ISizedQueue of reactor jobs currently being worked on by this
-        worker.""")
-
-    thread = zope.interface.Attribute(
-        """An ISizedQueue of thread jobs currently being worked on by this
-        worker.""")
-
-    UUID = zope.interface.Attribute(
-        """The uuid.UUID that identifies this worker (where one instance ==
-        one process == one worker == one UUID).""")
-
-    engineUUID = zope.interface.Attribute(
-        """The uuid.UUID of the engine that is running this worker, or None.""")
-
-    last_ping = zope.interface.Attribute(
-        """the datetime.datetime in the pytz.UTC timezone of the last ping.
-        This date should be updated anytime a worker accepts a job in a
-        reactor or thread queue; and whenever, during a poll,
-        (last_ping + ping_interval) <= now.""")
-
-    poll_seconds = zope.interface.Attribute(
-        """The number of seconds between the end of one worker poll and the
-        start of the next.""")
-
-    ping_interval = zope.interface.Attribute(
-        """The approximate maximum datetime.timedelta between pings before
-        a new last_ping should be recorded.""")
-
-    ping_death_interval = zope.interface.Attribute(
-        """the datetime.timedelta after the last_ping + ping_interval that
-        signifies the workers death.  That is, if (last_ping + ping_interval +
-        ping_death_interval) < now, the worker should be regarded as dead.
-        """)
-
-    completed = zope.interface.Attribute(
-        """The most recent jobs completed by this worker, in the order
-        from most recent `begin_after` to oldest.  ICompletedCollection.""")
-
 class IUUID(zope.interface.Interface):
     """A marker interface for the API of Ka-Ping Yee's uuid.UUID class.
     See http://zesty.ca/python/uuid.html """

Modified: zc.async/branches/dev/src/zc/async/job.py
===================================================================
--- zc.async/branches/dev/src/zc/async/job.py	2008-03-20 18:10:09 UTC (rev 84808)
+++ zc.async/branches/dev/src/zc/async/job.py	2008-03-20 23:59:06 UTC (rev 84809)
@@ -85,7 +85,7 @@
                 quotas = self.queue.quotas
                 for name in value:
                     if name not in quotas:
-                        raise ValueError('quota name not defined in queue')
+                        raise ValueError('unknown quota name', name)
             else:
                 raise zc.async.interfaces.BadStatusError(
                     'can only set quota_names when a job has NEW or PENDING '

Modified: zc.async/branches/dev/src/zc/async/queue.py
===================================================================
--- zc.async/branches/dev/src/zc/async/queue.py	2008-03-20 18:10:09 UTC (rev 84808)
+++ zc.async/branches/dev/src/zc/async/queue.py	2008-03-20 23:59:06 UTC (rev 84809)
@@ -2,6 +2,7 @@
 import bisect
 import pytz
 import persistent
+import persistent.interfaces
 import ZODB.interfaces
 import BTrees.OOBTree
 import BTrees.Length
@@ -17,7 +18,13 @@
 
 _marker = object()
 
+# purely optional
+ at zope.interface.implementer(zc.async.interfaces.IQueue)
+ at zope.component.adapter(persistent.interfaces.IPersistent)
+def getDefaultQueue(obj):
+    return ZODB.interfaces.IConnection(obj).root()[zc.async.interfaces.KEY]['']
 
+
 class DispatcherAgents(zc.async.utils.Dict):
     zope.interface.implements(zc.async.interfaces.IDispatcherAgents)
 

Modified: zc.async/branches/dev/src/zc/async/queue.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/queue.txt	2008-03-20 18:10:09 UTC (rev 84808)
+++ zc.async/branches/dev/src/zc/async/queue.txt	2008-03-20 23:59:06 UTC (rev 84809)
@@ -315,7 +315,7 @@
     >>> job4.quota_names = ('content catalog',)
     Traceback (most recent call last):
     ...
-    ValueError: quota name not defined in queue
+    ValueError: ('unknown quota name', 'content catalog')
 
 The same kind of error happens if we try to put a job with unknown quota
 names in a queue.

Modified: zc.async/branches/dev/src/zc/async/tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/tests.py	2008-03-20 18:10:09 UTC (rev 84808)
+++ zc.async/branches/dev/src/zc/async/tests.py	2008-03-20 23:59:06 UTC (rev 84809)
@@ -46,6 +46,10 @@
         test.globs['db'].close()
         test.globs['storage'].close()
         test.globs['storage'].cleanup()
+    if 'async_storage' in test.globs:
+        test.globs['async_db'].close()
+        test.globs['async_storage'].close()
+        test.globs['async_storage'].cleanup()
 
 def test_instanceuuid():
     """This module provides access to a UUID that is intended to uniquely
@@ -91,10 +95,10 @@
 
     """
 def test_long_to_dt():
-    """The utils module provides two cool methods to convert a date to a long
-    and back again.  Dates in the future get smaller and smaller, so dates
-    are arranged from newest to oldest in a BTree.  It leaves an extra 4 bits
-    at the bottom.  It can convert all possible datetimes.
+    """The utils module provides two methods to convert a date to a long
+    and back again.  Dates in the future get smaller and smaller, so
+    dates are arranged from newest to oldest in a BTree.  It leaves an
+    extra 4 bits at the bottom.  It can convert all possible datetimes.
     
     >>> from zc.async.utils import long_to_dt, dt_to_long
     >>> import datetime
@@ -122,6 +126,7 @@
             'agent.txt',
             'dispatcher.txt',
             'README.txt',
+            'README_2.txt',
             setUp=modSetUp, tearDown=modTearDown,
             optionflags=doctest.INTERPRET_FOOTNOTES),
         ))



More information about the Checkins mailing list