[Checkins] SVN: zc.async/trunk/src/zc/async/ Add a number of powerful monitoring and introspection/diagnosis tools.

Gary Poster gary at modernsongs.com
Wed Sep 10 23:58:10 EDT 2008


Log message for revision 91050:
  Add a number of powerful monitoring and introspection/diagnosis tools.
  
  Tools are in monitordb.py.
  
  Work also led to the following changes.
  
  New ``filter`` on agent is preferred over ``chooser``. ``_choose`` is a
  specific subclass hook now.
  
  Worked on making tests less prone to spurious timing failures (at the expense
  of making the tests siginificantly slower)
  
  Moved legacy code, of which agent.chooseFirst is now an example, to a new
  legacy module
  
  refactored monitor code so that the monitor functions can be used from Python
  directly more easily, in addition to though zc.z3monitor.  This pattern also
  used for the new monitordb functions.
  
  moved some functions, now used by multiple modules, to utils
  
  still more to do before 1.5 (see TODO)
  
  

Changed:
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/README_1.txt
  U   zc.async/trunk/src/zc/async/README_2.txt
  U   zc.async/trunk/src/zc/async/TODO.txt
  U   zc.async/trunk/src/zc/async/agent.py
  U   zc.async/trunk/src/zc/async/agent.txt
  U   zc.async/trunk/src/zc/async/dispatcher.py
  U   zc.async/trunk/src/zc/async/dispatcher.txt
  U   zc.async/trunk/src/zc/async/dispatcher.zcml
  U   zc.async/trunk/src/zc/async/interfaces.py
  U   zc.async/trunk/src/zc/async/job.py
  A   zc.async/trunk/src/zc/async/legacy.py
  U   zc.async/trunk/src/zc/async/monitor.py
  U   zc.async/trunk/src/zc/async/monitor.txt
  A   zc.async/trunk/src/zc/async/monitordb.py
  A   zc.async/trunk/src/zc/async/monitordb.txt
  U   zc.async/trunk/src/zc/async/parallel_serial.txt
  U   zc.async/trunk/src/zc/async/testing.py
  U   zc.async/trunk/src/zc/async/utils.py
  U   zc.async/trunk/src/zc/async/z3tests.py

-=-
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-09-11 03:58:09 UTC (rev 91050)
@@ -70,6 +70,17 @@
 - zc.async events inherit from 'zc.component.interfaces.IObjectEvent' instead
   of a zc.async specific IObjectEvent (thanks to Satchit Haridas).
 
+- Added new monitoring and introspection tools XXX
+
+- Added new preferred way of filtering agent choices: ``filter`` attribute XXX
+
+- deprecated agent.chooseFirst XXX
+
+- Moved deprecated legacy code to new ``legacy`` module.
+
+- Tried to be significantly reduce the chance of spurious timing errors in the
+  tests, at the expense of causing the tests to take longer to run.
+
 1.4.1 (2008-07-30)
 ==================
 

Modified: zc.async/trunk/src/zc/async/README_1.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_1.txt	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/README_1.txt	2008-09-11 03:58:09 UTC (rev 91050)
@@ -875,7 +875,7 @@
     >>> import zc.async.agent
     >>> agent = zc.async.agent.Agent()
     >>> queue.dispatchers[dispatcher.UUID]['main'] = agent
-    >>> agent.chooser is zc.async.agent.chooseFirst
+    >>> agent.filter is None
     True
     >>> agent.size
     3
@@ -1171,13 +1171,14 @@
 
 .. [#stop_usage_reactor]
 
+    >>> threads = []
+    >>> for queue_pools in dispatcher.queues.values():
+    ...     for pool in queue_pools.values():
+    ...         threads.extend(pool.threads)
     >>> reactor.stop()
     >>> zc.async.testing.wait_for_deactivation(dispatcher)
-    >>> for queue_pools in dispatcher.queues.values():
-    ...     for name, pool in queue_pools.items():
-    ...         pool.setSize(0)
-    ...         for thread in pool.threads:
-    ...             thread.join(3)
+    >>> for thread in threads:
+    ...     thread.join(3)
     ...
     >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
     {'failed': 2,

Modified: zc.async/trunk/src/zc/async/README_2.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_2.txt	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/README_2.txt	2008-09-11 03:58:09 UTC (rev 91050)
@@ -442,47 +442,32 @@
 
 We currently have an agent that simply asks for the next available FIFO job.
 We are using an agent implementation that allows you to specify a callable to
-choose the job.  That callable is now zc.async.agent.chooseFirst.
+filter the job.  That callable is now None.
 
-    >>> agent.chooser is zc.async.agent.chooseFirst
+    >>> agent.filter is None
     True
 
-Here's the entire implementation of that function::
+What does a filter do?  A filter takes a job and returns a value evaluated as a
+boolean.  For instance, let's say we always wanted a certain number of threads
+available for working on a particular call; for the purpose of example, we'll
+use ``operator.mul``, though a more real-world example might be a network call
+or a particular call in your application.
 
-    def chooseFirst(agent):
-        return agent.queue.claim()
-
-What would another agent do?  Well, it might pass a filter function to
-``claim``.  This function takes a job and returns a value evaluated as a
-boolean.  For instance, let's say we always wanted a certain number of
-threads available for working on a particular call; for the purpose of
-example, we'll use ``operator.mul``, though a more real-world example
-might be a network call or a particular call in your application.
-
     >>> import operator
-    >>> def chooseMul(agent):
-    ...     return agent.queue.claim(lambda job: job.callable is operator.mul)
+    >>> def chooseMul(job):
+    ...     return job.callable == operator.mul
     ...
 
-Another variant would prefer operator.mul, but if one is not in the queue,
-it will take any.
+You might want something more sophisticated, such as preferring operator.mul,
+but if one is not in the queue, it will take any; or doing any other priority
+variations.  To do this, you'll want to write your own agent--possibly
+inheriting from the provided one and overriding ``_choose``.
 
-    >>> def preferMul(agent):
-    ...     res = agent.queue.claim(lambda job: job.callable is operator.mul)
-    ...     if res is None:
-    ...         res = agent.queue.claim()
-    ...     return res
-    ...
+Let's set up another agent, in addition to the default one, that has
+the ``chooseMul`` policy.
 
-Other approaches might look at the current jobs in the agent, or the agent's
-dispatcher, and decide what jobs to prefer on that basis.  The agent should
-support many ideas.
+    >>> agent2 = dispatcher_agents['mul'] = zc.async.agent.Agent(chooseMul)
 
-Let's set up another agent, in addition to the ``chooseFirst`` one, that has
-the ``preferMul`` policy.
-
-    >>> agent2 = dispatcher_agents['mul'] = zc.async.agent.Agent(preferMul)
-
 Another characteristic of agents is that they specify how many jobs they
 should pick at a time.  The dispatcher actually adjusts the size of the
 ZODB connection pool to accommodate its agents' size.  The default is 3.

Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/TODO.txt	2008-09-11 03:58:09 UTC (rev 91050)
@@ -1,9 +1,8 @@
 - fix up tips so that it looks better
 - write a zc.buildout/grok quickstart
-- zc.z3monitor:
-  + asyncdb count pending callable:foo*
-  + asyncdb count completed agent:entry5
-  + asyncdb countall completed
+- zc.monitor instead of zc.z3monitor
+- improvements to thread teardown
+- docstrings for monitordb functions
 
 Improvements
 

Modified: zc.async/trunk/src/zc/async/agent.py
===================================================================
--- zc.async/trunk/src/zc/async/agent.py	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/agent.py	2008-09-11 03:58:09 UTC (rev 91050)
@@ -14,25 +14,53 @@
 import persistent
 import datetime
 
+import rwproperty
 import zope.interface
 import zope.component
 
 import zc.async.interfaces
 import zc.async.utils
 
+from zc.async.legacy import chooseFirst
 
-def chooseFirst(agent):
-    return agent.queue.claim()
-
-
 class Agent(zc.async.utils.Base):
 
     zope.interface.implements(zc.async.interfaces.IAgent)
 
-    def __init__(self, chooser=None, size=3):
-        if chooser is None:
-            chooser = chooseFirst
+    _chooser = _filter = None
+
+    @property
+    def filter(self):
+        return self._filter
+    @rwproperty.setproperty
+    def filter(self, value):
+        if value is not None and self.chooser is not None:
+            raise ValueError('cannot set both chooser and filter to non-None')
+        self._filter = value
+
+    @property
+    def chooser(self):
+        res = self._chooser
+        if res is None: # legacy support
+            res = self.__dict__.get('chooser')
+        return res
+    @rwproperty.setproperty
+    def chooser(self, value):
+        if value is not None and self.filter is not None:
+            raise ValueError('cannot set both chooser and filter to non-None')
+        self._chooser = value
+        if 'chooser' in self.__dict__:
+            del self.__dict__['chooser']
+        if value is None:
+            zope.interface.alsoProvides(self, zc.async.interfaces.IFilterAgent)
+        else:
+            zope.interface.directlyProvides(self,
+                zope.interface.directlyProvidedBy(self) -
+                zc.async.interfaces.IFilterAgent)
+
+    def __init__(self, chooser=None, filter=None, size=3):
         self.chooser = chooser
+        self.filter = filter
         self.size = size
         self._data = zc.queue.PersistentQueue()
         self._data.__parent__ = self
@@ -79,8 +107,8 @@
             # activated but it changed beneath us.  If the ZODB grows a gesture
             # to cause this, use it.
             return None
-        if len(self._data) < self.size:
-            res = self.chooser(self)
+        if len(self._data) < self.size: # MVCC can cause error here...
+            res = self._choose()
             if res is not None:
                 res.parent = self
                 self._data.put(res)
@@ -88,6 +116,12 @@
             res = None
         return res
 
+    def _choose(self): # hook point for subclass.  Override if desired.
+        if self.chooser is not None:
+            return self.chooser(self)
+        else:
+            return self.queue.claim(self.filter)
+
     def jobCompleted(self, job):
         self.remove(job)
         self.completed.add(job)

Modified: zc.async/trunk/src/zc/async/agent.txt
===================================================================
--- zc.async/trunk/src/zc/async/agent.txt	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/agent.txt	2008-09-11 03:58:09 UTC (rev 91050)
@@ -4,9 +4,10 @@
 Arguably the most interesting method to control is ``claimJob``.  It is 
 responsible for getting the next job from the queue.
 
-The default implementation in zc.async.agent allows you to pass in a callable
-that, given the agent, claims and returns the desired job.  The default
-callable simply asks for the next job.
+The default implementation in zc.async.agent allows you to control this in one
+of two ways: a ``filter`` or a ``chooser``.  We will examine these later.  The
+default behavior is to accept all jobs (constrained by the agent's size, as
+also seen below).
 
 Let's take a quick look at how the agent works.  Let's imagine we have a
 queue with a dispatcher with an agent [#setUp]_.
@@ -124,9 +125,127 @@
     >>> len(agent)
     1
 
-This particular agent invites you to provide a function to choose jobs.
-The default one simply chooses the first available job in the queue.
+Let's clean out the queue and agent so our next section has a clean slate.
 
+    >>> while len(queue):
+    ...     ignore = queue.pull(0)
+    ...
+    >>> job5()
+    42
+    >>> len(queue)
+    0
+    >>> len(agent)
+    0
+
+Filters and Choosers
+====================
+
+This particular agent invites you to provide a function to choose jobs.  As
+mentioned above, this can either be a "filter" or a "chooser".  Filters are
+preferred because they allow for better reporting in monitoring code.
+
+Our agent has a ``None`` filter at the moment, and provides a special
+interface indicating that it uses filters:
+``zc.async.interfaces.IFilterAgent``.
+
+    >>> print agent.filter
+    None
+    >>> import zc.async.interfaces
+    >>> zc.async.interfaces.IFilterAgent.providedBy(agent)
+    True
+
+We can define a filter.  The ``filter`` attribute, if not None, must simply
+be a callable that takes a job and returns ``True`` if the agent will accept
+the job, and ``False`` otherwise.  Let's define a filter that accepts
+only jobs with the ``mock_work`` callable and put it on our agent.
+
+    >>> def mock_work_filter(job):
+    ...     return job.callable == mock_work
+    ...
+    >>> agent.filter = mock_work_filter
+    >>> zc.async.interfaces.IFilterAgent.providedBy(agent)
+    True
+    >>> transaction.commit()
+
+Now let's add some jobs.
+
+    >>> import operator
+    >>> import zc.async.job
+    >>> job6 = queue.put(mock_work)
+    >>> job7 = queue.put(zc.async.job.Job(operator.mul, 42, 2))
+    >>> job8 = queue.put(mock_work)
+    >>> job9 = queue.put(zc.async.job.Job(operator.mul, 42, 2))
+
+The agent will claim the mock_work jobs, but not the other two.
+    
+    >>> job6 is agent.claimJob()
+    True
+    >>> job8 is agent.claimJob()
+    True
+    >>> print agent.claimJob()
+    None
+    >>> len(agent)
+    2
+    >>> job6()
+    42
+    >>> job8()
+    42
+    >>> len(agent)
+    0
+    >>> print agent.claimJob()
+    None
+
+Filters are used for monitoring to determine how many jobs in a queue a given
+agent might help to perform.  Therefore, typically a filter should not take the
+state of an agent into account when it decides whether or not to accept a job.
+That should happen in a subclass' ``claimJob`` custom implementation...or in
+a ``chooser``.
+
+The ``chooser`` attribute is the original approach of the agent implementation
+before the ``filter`` was added.  It's primary failing is that it is not
+designed to allow easy introspection of what jobs an agent would be willing to
+do, because the code that filtered also mutated the queue.  That said, it can
+still come in handy if you want to have a chooser that takes the agent's state
+into account.
+
+You cannot have a non-None filter and set a chooser, in this implementation.
+
+    >>> def choose_add(agent):
+    ...     return agent.queue.claim(lambda j: j.callable == operator.add)
+    ...
+    >>> agent.chooser = choose_add
+    Traceback (most recent call last):
+    ...
+    ValueError: cannot set both chooser and filter to non-None
+    >>> agent.filter = None
+    >>> zc.async.interfaces.IFilterAgent.providedBy(agent)
+    True
+    >>> agent.chooser = choose_add
+    >>> zc.async.interfaces.IFilterAgent.providedBy(agent)
+    False
+    >>> agent.filter = mock_work_filter
+    Traceback (most recent call last):
+    ...
+    ValueError: cannot set both chooser and filter to non-None
+    >>> transaction.commit()
+
+Now we have a chooser that only wants to perform jobs with an operator.add
+callable.
+
+    >>> len(queue)
+    2
+    >>> print agent.claimJob()
+    None
+    >>> job9 = queue.put(zc.async.job.Job(operator.add, 41, 1))
+    >>> agent.claimJob() is job9
+    True
+    >>> len(queue)
+    2
+    >>> len(agent)
+    1
+
+Again, filters are preferred.
+
 .. [#setUp] First we'll get a database and the necessary registrations.
 
     >>> from ZODB.tests.util import DB

Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/dispatcher.py	2008-09-11 03:58:09 UTC (rev 91050)
@@ -217,6 +217,10 @@
                 self.queue.put(None)
         return size - old # size difference
 
+def getId(obj):
+    dbname = getattr(obj._p_jar.db(), 'database_name', None)
+    return (ZODB.utils.u64(obj._p_oid), dbname)
+
 # this is mostly for testing, though ``get`` comes in handy generally
 
 _dispatchers = {}
@@ -354,10 +358,7 @@
                     pools = self.queues[queue.name] = {}
                 for name, agent in da.items():
                     job_info = []
-                    active_jobs = [
-                        (ZODB.utils.u64(job._p_oid),
-                         getattr(job._p_jar.db(), 'database_name', None))
-                         for job in agent]
+                    active_jobs = [getId(job) for job in agent]
                     agent_info = queue_info[name] = {
                         'size': None, 'len': None, 'error': None,
                         'new jobs': job_info, 'active jobs': active_jobs}
@@ -398,9 +399,7 @@
                                     'thread': None,
                                     'reassigned': False}
                             started_jobs.append(info)
-                            dbname = getattr(
-                                job._p_jar.db(), 'database_name', None)
-                            jobid = (ZODB.utils.u64(job._p_oid), dbname)
+                            jobid = uoid, dbname = getId(job)
                             self.jobs[jobid] = info
                             job_info.append(jobid)
                             pool.queue.put(

Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/dispatcher.txt	2008-09-11 03:58:09 UTC (rev 91050)
@@ -267,14 +267,14 @@
     ...
     >>> print info1.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     starting in thread ...: <zc.async.job.Job (oid ..., db 'unnamed')
-                             ``<built-in function mul>(14, 3)``>
+                             ``operator.mul(14, 3)``>
     >>> print info2.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     <zc.async.job.Job (oid ..., db 'unnamed')
-     ``<built-in function mul>(14, 3)``> succeeded with result:
+     ``operator.mul(14, 3)``> succeeded with result:
     42
     >>> print info3.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     completed in thread ...: <zc.async.job.Job (oid ..., db 'unnamed')
-                             ``<built-in function mul>(14, 3)``>
+                             ``operator.mul(14, 3)``>
 
     >>> print debug.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     poll ...:
@@ -291,7 +291,7 @@
     >>> pprint.pprint(info)
     ... # doctest: +ELLIPSIS
     {'agent': 'main',
-     'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``<built-in function mul>(14, 3)``>",
+     'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``operator.mul(14, 3)``>",
      'completed': datetime.datetime(...),
      'failed': False,
      'poll id': ...,
@@ -548,7 +548,7 @@
     ...
     >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     <zc.async.job.Job (oid ..., db 'unnamed')
-     ``<built-in function mul>(14, None)``> failed with traceback:
+     ``operator.mul(14, None)``> failed with traceback:
     *--- Failure #... (pickled) ---
     .../zc/async/job.py:...: __call__(...)
      [ Locals ]...

Modified: zc.async/trunk/src/zc/async/dispatcher.zcml
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.zcml	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/dispatcher.zcml	2008-09-11 03:58:09 UTC (rev 91050)
@@ -2,7 +2,9 @@
 <configure xmlns="http://namespaces.zope.org/zope">
     <include file="configure.zcml" />
     <utility component=".monitor.async"
-             provides="zc.z3monitor.interfaces.IZ3MonitorPlugin" name="async" />
-    <!-- maybe could divide up queue_installer so queues collection is added
-         here? -->
+             provides="zc.z3monitor.interfaces.IZ3MonitorPlugin"
+             name="async" />
+    <utility component=".monitordb.asyncdb"
+             provides="zc.z3monitor.interfaces.IZ3MonitorPlugin"
+             name="asyncdb" />
 </configure>

Modified: zc.async/trunk/src/zc/async/interfaces.py
===================================================================
--- zc.async/trunk/src/zc/async/interfaces.py	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/interfaces.py	2008-09-11 03:58:09 UTC (rev 91050)
@@ -341,7 +341,25 @@
     def index(item):
         """return index, or raise ValueError if item is not in queue"""
 
+class IFilterAgent(IAgent):
+    """An agent that uses a filter to claim jobs (see ``IQueue.claim``).
+    
+    This sort of agent can easily report what jobs it *could* take because the
+    filter simply should be able to return a boolean and not change any state.
+    """
+    
+    def filter(job):
+        """return whether the agent could perform the job.
+        
+        This decision should ignore whether the agent has any room (that is, if
+        len(agent) < agent.size).
+        
+        As a special case, if the ``filter`` attribute on the agent is None,
+        this should be considered to be a do-nothing filter--that is, the agent
+        accepts all jobs.
+        """
 
+
 class IQueue(zc.queue.interfaces.IQueue):
 
     parent = zope.interface.Attribute(

Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/job.py	2008-09-11 03:58:09 UTC (rev 91050)
@@ -35,40 +35,8 @@
 import zc.async.utils
 import zc.async
 
-def _repr(obj):
-    if isinstance(obj, persistent.Persistent):
-        dbname = "?"
-        if obj._p_jar is not None:
-            dbname = getattr(obj._p_jar.db(), 'database_name', "?")
-            if dbname != '?':
-                dbname = repr(dbname)
-        if obj._p_oid is not None:
-            oid = ZODB.utils.u64(obj._p_oid)
-        else:
-            oid = '?'
-        return '%s.%s (oid %s, db %s)' % (
-            obj.__class__.__module__,
-            obj.__class__.__name__,
-            oid,
-            dbname)
-    elif isinstance(obj, types.FunctionType):
-        return '%s.%s' % (obj.__module__, obj.__name__)
-    else:
-        return repr(obj)
+from zc.async.legacy import success_or_failure
 
-# this is kept so that legacy databases can keep their references to this
-# function
-def success_or_failure(success, failure, res):
-    callable = None
-    if isinstance(res, twisted.python.failure.Failure):
-        if failure is not None:
-            callable = failure
-    elif success is not None:
-        callable = success
-    if callable is None:
-        return res
-    return callable(res)
-
 class RetryCommonFourTimes(persistent.Persistent): # default
     zope.component.adapts(zc.async.interfaces.IJob)
     zope.interface.implements(zc.async.interfaces.IRetryPolicy)
@@ -480,17 +448,20 @@
 
     def __repr__(self):
         try:
-            call = _repr(self._callable_root)
+            call = zc.async.utils.custom_repr(self._callable_root)
             if self._callable_name is not None:
                 call += ' :' + self._callable_name
-            args = ', '.join(_repr(a) for a in self.args)
-            kwargs = ', '.join(k+"="+_repr(v) for k, v in self.kwargs.items())
+            args = ', '.join(zc.async.utils.custom_repr(a) for a in self.args)
+            kwargs = ', '.join(
+                k + "=" + zc.async.utils.custom_repr(v)
+                for k, v in self.kwargs.items())
             if args:
                 if kwargs:
                     args += ", " + kwargs
             else:
                 args = kwargs
-            return '<%s ``%s(%s)``>' % (_repr(self), call, args)
+            return '<%s ``%s(%s)``>' % (
+                zc.async.utils.custom_repr(self), call, args)
         except (TypeError, ValueError, AttributeError):
             # broken reprs are a bad idea; they obscure problems
             return super(Job, self).__repr__()

Added: zc.async/trunk/src/zc/async/legacy.py
===================================================================
--- zc.async/trunk/src/zc/async/legacy.py	                        (rev 0)
+++ zc.async/trunk/src/zc/async/legacy.py	2008-09-11 03:58:09 UTC (rev 91050)
@@ -0,0 +1,20 @@
+# This file contains code that only exists for backwards compatibility with
+# previous versions of zc.async.  Typically, these definitions are imported
+# with ``from zc.async.legacy import EXAMPLE`` so that database references
+# can find the code in the old locations.
+
+import twisted.python.failure
+
+def success_or_failure(success, failure, res):
+    callable = None
+    if isinstance(res, twisted.python.failure.Failure):
+        if failure is not None:
+            callable = failure
+    elif success is not None:
+        callable = success
+    if callable is None:
+        return res
+    return callable(res)
+
+def chooseFirst(agent):
+    return agent.queue.claim()

Modified: zc.async/trunk/src/zc/async/monitor.py
===================================================================
--- zc.async/trunk/src/zc/async/monitor.py	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/monitor.py	2008-09-11 03:58:09 UTC (rev 91050)
@@ -11,15 +11,21 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
+import datetime
 import re
-import datetime
+import types
+
 import pytz
-import uuid
+from uuid import UUID as uuid_UUID # we use this non-standard import spelling
+# because ``uuid`` is frequently an argument and UUID is a function defined
+# locally.
 import simplejson
-
 import zope.component
+import persistent.interfaces
 
 import zc.async.dispatcher
+import zc.async.interfaces
+import zc.async.utils
 
 _marker = object()
 class Encoder(simplejson.JSONEncoder):
@@ -43,21 +49,65 @@
             if obj.tzinfo is not None:
                 obj = obj.astimezone(pytz.UTC).replace(tzinfo=None)
             return obj.isoformat() + "Z"
-        elif isinstance(obj, uuid.UUID):
+        elif isinstance(obj, uuid_UUID):
             return str(obj)
+        elif zc.async.interfaces.IJob.providedBy(obj):
+            return zc.async.dispatcher.getId(obj)
+        elif getattr(obj, 'next', _marker) is not _marker:
+            # iterator.  Duck typing too fuzzy, practically?
+            return tuple(obj)
+        elif ((types.FunctionType, types.BuiltinFunctionType) or
+              persistent.interfaces.IPersistent.providedBy(obj)):
+            return zc.async.utils.custom_repr(obj)
         return simplejson.JSONEncoder.default(self, obj)
 
 encoder = Encoder(sort_keys=True, indent=4)
 
+def monitor(funcs, help_text, connection, cmd, raw, needs_db_connection=False):
+    if cmd is None:
+        res = help_text
+    else:
+        f = funcs.get(cmd)
+        if f is None:
+            res = '[Unknown tool name for this command: %s]' % (cmd,)
+        else:
+            args = []
+            kwargs = {}
+            for val in raw:
+                if ':' in val:
+                    key, val = val.split(':', 1)
+                    kwargs[key] = val
+                else:
+                    if kwargs:
+                        raise ValueError(
+                            'placeful modifiers must come before named '
+                            'modifiers')
+                    args.append(val)
+            if needs_db_connection:
+                dispatcher = zc.async.dispatcher.get()
+                conn = dispatcher.db.open()
+                try:
+                    res = f(conn, *args, **kwargs)
+                    if not isinstance(res, str):
+                        res = encoder.encode(res)
+                finally:
+                    conn.close()
+            else:
+                res = f(*args, **kwargs)
+                if not isinstance(res, str):
+                    res = encoder.encode(res)
+    connection.write(res)
+    connection.write('\n')
 
+
 def status(uuid=None):
     """Get a mapping of general zc.async dispatcher information.
 
     'status' is one of 'STUCK', 'STARTING', 'RUNNING', or 'STOPPED', where
     'STUCK' means the poll is past due."""
     if uuid is not None:
-        uuid = uuid.UUID(uuid)
-    return encoder.encode(zc.async.dispatcher.get(uuid).getStatusInfo())
+        uuid = uuid_UUID(uuid)
+    return zc.async.dispatcher.get(uuid).getStatusInfo()
 
 def jobs(queue=None, agent=None, uuid=None):
     """Show active jobs in worker threads as of the instant.
@@ -80,9 +130,8 @@
         async jobs queue: agent:main
         (results filtered to queue named '' and agent named 'main')"""
     if uuid is not None:
-        uuid = uuid.UUID(uuid)
-    return encoder.encode(
-        zc.async.dispatcher.get(uuid).getActiveJobIds(queue, agent))
+        uuid = uuid_UUID(uuid)
+    return zc.async.dispatcher.get(uuid).getActiveJobIds(queue, agent)
 
 def job(OID, database=None, uuid=None):
     """Local information about a job as of last poll, if known.
@@ -104,9 +153,8 @@
     converted back to an integer with ``ZODB.utils.u64``.
     """
     if uuid is not None:
-        uuid = uuid.UUID(uuid)
-    return encoder.encode(
-        zc.async.dispatcher.get(uuid).getJobInfo(long(OID), database))
+        uuid = uuid_UUID(uuid)
+    return zc.async.dispatcher.get(uuid).getJobInfo(long(OID), database)
 
 _find = re.compile(r'\d+[DHMS]').findall
 def _dt(s):
@@ -119,11 +167,11 @@
             vals = {}
             for val in _find(s.upper()):
                 vals[val[-1]] = int(val[:-1])
-            res = datetime.timedelta(
+            res = datetime.datetime.utcnow() - datetime.timedelta(
                 days=vals.get('D', 0),
                 hours=vals.get('H', 0),
                 minutes=vals.get('M', 0),
-                seconds=vals.get('S', 0)) + datetime.datetime.utcnow()
+                seconds=vals.get('S', 0))
     return res
 
 
@@ -166,13 +214,12 @@
     Example:
 
         async jobstats queue: agent:main since:1H
-        (results filtered to queue named '' and agent named 'main' from now
-         till one hour ago)"""
+        (results filtered to queue named '' and agent named 'main' from
+         one hour ago till now)"""
     if uuid is not None:
-        uuid = uuid.UUID(uuid)
-    return encoder.encode(
-        zc.async.dispatcher.get(uuid).getStatistics(
-            _dt(at), _dt(before), _dt(since), queue, agent))
+        uuid = uuid_UUID(uuid)
+    return zc.async.dispatcher.get(uuid).getStatistics(
+        _dt(at), _dt(before), _dt(since), queue, agent)
 
 def poll(at=None, before=None, uuid=None):
     """Get information about a single poll, defaulting to most recent.
@@ -200,11 +247,10 @@
         (get the poll information at five minutes ago or before)"""
     # TODO: parse at and before to datetimes
     if uuid is not None:
-        uuid = uuid.UUID(uuid)
+        uuid = uuid_UUID(uuid)
     info = zc.async.dispatcher.get(uuid).getPollInfo(_dt(at), _dt(before))
-    res = {'key': info.key, 'time': info.utc_timestamp.isoformat() + "Z",
-           'results': info}
-    return encoder.encode(res)
+    return {'key': info.key, 'time': info.utc_timestamp.isoformat() + "Z",
+            'results': info}
 
 def polls(at=None, before=None, since=None, count=None, uuid=None):
     """Get information about recent polls, defaulting to most recent.
@@ -237,20 +283,19 @@
 
     Example:
 
-        async polls before:5M since:10M
-        (get the poll information from 5 to 10 minutes ago)"""
+        async polls since:10M before:5M
+        (get the poll information from 10 to 5 minutes ago)"""
     if uuid is not None:
-        uuid = uuid.UUID(uuid)
+        uuid = uuid_UUID(uuid)
     if count is None:
         if since is None:
             count = 3
     else:
         count = int(count)
-    return encoder.encode(
-        [{'key': p.key, 'time': p.utc_timestamp.isoformat() + "Z",
-          'results': p}
-         for p in zc.async.dispatcher.get(uuid).iterPolls(
-            _dt(at), _dt(before), _dt(since), count)])
+    return [{'key': p.key, 'time': p.utc_timestamp.isoformat() + "Z",
+             'results': p}
+            for p in zc.async.dispatcher.get(uuid).iterPolls(
+               _dt(at), _dt(before), _dt(since), count)]
 
 # provide in async and separately:
 
@@ -260,9 +305,7 @@
 
 def UUID():
     """Get instance UUID in hex."""
-    res = zope.component.getUtility(zc.async.interfaces.IUUID)
-    if res is not None:
-        return str(res)
+    return str(zope.component.getUtility(zc.async.interfaces.IUUID))
 
 funcs = {}
 
@@ -287,45 +330,11 @@
 for f in status, jobs, job, jobstats, poll, polls, utcnow, UUID, help:
     funcs[f.__name__] = f
 
-def monitor(funcs, help, connection, cmd, raw):
-    if cmd is None:
-        res = help
-    else:
-        f = funcs.get(cmd)
-        if f is None:
-            res = '[Unknown async tool]'
-        else:
-            args = []
-            kwargs = {}
-            for val in raw:
-                if ':' in val:
-                    key, val = val.split(':', 1)
-                    kwargs[key] = val
-                else:
-                    if kwargs:
-                        raise ValueError(
-                            'placeful modifiers must come before named '
-                            'modifiers')
-                    args.append(val)
-            res = f(*args, **kwargs)
-    connection.write(res)
-    connection.write('\n')
-
 def async(connection, cmd=None, *raw):
-    """A collection of tools to monitor zc.async activity in this process.
+    """Monitor zc.async activity in this process.
 
     To see a list of async tools, use 'async help'.
 
     To learn more about an async monitor tool, use 'async help <tool name>'."""
     monitor(funcs, async.__doc__, connection, cmd, raw)
 
-def asyncdb(connection, cmd=None, *raw):
-    """A collection of tools to monitor zc.async activity in the database.
-
-    To see a list of asyncdb tools, use 'asyncdb help'.
-
-    To learn more about an asyncdb monitor tool, use 'asyncdb help <tool name>'.
-
-    ``asyncdb`` tools differ from ``async`` tools in that ``asyncdb`` tools
-    access the database, and ``async`` tools do not."""
-    monitor(dbfuncs, asyncdb.__doc__, connection, cmd, raw)

Modified: zc.async/trunk/src/zc/async/monitor.txt
===================================================================
--- zc.async/trunk/src/zc/async/monitor.txt	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/monitor.txt	2008-09-11 03:58:09 UTC (rev 91050)
@@ -11,7 +11,7 @@
     >>> connection.test_input('help async\n')
     Help for async:
     <BLANKLINE>
-    A collection of tools to monitor zc.async activity in this process.
+    Monitor zc.async activity in this process.
     <BLANKLINE>
         To see a list of async tools, use 'async help'.
     <BLANKLINE>
@@ -174,8 +174,8 @@
         Example:
     <BLANKLINE>
             async jobstats queue: agent:main since:1H
-            (results filtered to queue named '' and agent named 'main' from now
-             till one hour ago) 
+            (results filtered to queue named '' and agent named 'main' from
+             one hour ago till now) 
     -> CLOSE
 
     >>> connection.test_input('async jobstats\n')
@@ -266,8 +266,8 @@
     <BLANKLINE>
         Example:
     <BLANKLINE>
-            async polls before:5M since:10M
-            (get the poll information from 5 to 10 minutes ago) 
+            async polls since:10M before:5M
+            (get the poll information from 10 to 5 minutes ago) 
     -> CLOSE
 
     >>> connection.test_input('async polls\n')

Added: zc.async/trunk/src/zc/async/monitordb.py
===================================================================
--- zc.async/trunk/src/zc/async/monitordb.py	                        (rev 0)
+++ zc.async/trunk/src/zc/async/monitordb.py	2008-09-11 03:58:09 UTC (rev 91050)
@@ -0,0 +1,389 @@
+import datetime
+import fnmatch
+import re
+from uuid import UUID # we use this non-standard import spelling because
+# ``uuid`` is frequently an argument
+
+import pytz
+import twisted.python.failure
+import ZODB.interfaces
+import ZODB.utils
+import zope.component
+
+import zc.async.dispatcher
+import zc.async.interfaces
+import zc.async.monitor
+
+_available_states = frozenset(
+    ('pending', 'assigned', 'active', 'callbacks', 'completed', 'succeeded',
+     'failed'))
+
+def _get_date_filter(name, value):
+    since = before = None
+    for o in value.split(','):
+        if o.startswith('since'):
+            if since is not None:
+                raise ValueError('only provide "since" once (%s)' % (name,))
+            since = zc.async.monitor._dt(o[5:]).replace(tzinfo=pytz.UTC)
+        elif o.startswith('before'):
+            if before is not None:
+                raise ValueError('only provide "before" once (%s)' % (name,))
+            before = zc.async.monitor._dt(o[5:]).replace(tzinfo=pytz.UTC)
+    return lambda j: ((since is None or getattr(j, name) > since) and
+                      (before is None or getattr(j, name) < before))
+
+def _jobs(context, states,
+         callable=None, queue=None, agent=None, requested_start=None,
+         start=None, end=None, callbacks_completed=None,
+         uuid=None):
+    conn = ZODB.interfaces.IConnection(context)
+    states = set(states)
+    unknown = states - _available_states
+    if unknown:
+        raise ValueError('Available states are %s (unknown: %s)' %
+                         (', '.join(sorted(_available_states)),
+                          ', '.join(sorted(unknown))))
+    completed = set(['completed', 'succeeded', 'failed']) & states
+    if len(completed) > 1:
+        raise ValueError(
+            'can only include zero or one of '
+            '"completed", "succeeded," or "failed" states.')
+    elif completed:
+        completed = iter(completed).next()
+    if not states:
+        raise ValueError('Specify at least one of the available states: %s' %
+                         (', '.join(sorted(_available_states)),))
+    pending = 'pending' in states
+    assigned = 'assigned' in states
+    active = 'active' in states
+    callbacks = 'callbacks' in states
+    agent_states = []
+    if assigned:
+        agent_states.append(zc.async.interfaces.ASSIGNED)
+    if active:
+        agent_states.append(zc.async.interfaces.ACTIVE)
+    if callbacks:
+        agent_states.append(zc.async.interfaces.CALLBACKS)
+    if uuid is not None:
+        if uuid.upper() == 'THIS':
+            uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
+        else:
+            uuid = UUID(uuid)
+    filters = []
+    if callable is not None:
+        regex = fnmatch.translate(callable)
+        if '.' not in callable:
+            regex = r'(.*\.)?%s$' % (regex,)
+        callable = re.compile(regex).match
+        filters.append(
+            lambda j: callable(zc.async.utils.custom_repr(j.callable)))
+    if requested_start:
+        filters.append(_get_date_filter('begin_after', requested_start))
+    if start:
+        pending = False
+        filters.append(_get_date_filter('active_start', start))
+    if end:
+        pending = assigned = active = False
+        filters.append(_get_date_filter('active_end', end))
+    if callbacks_completed:
+        pending = assigned = active = callbacks = False
+        filters.append(
+            _get_date_filter('initial_callbacks_end', callbacks_completed))
+    if queue is not None:
+        queue = re.compile(fnmatch.translate(queue)).match
+    if agent is not None:
+        agent = re.compile(fnmatch.translate(agent)).match
+    queues = conn.root()[zc.async.interfaces.KEY]
+    for q_name, q in queues.items():
+        if queue and not queue(q_name):
+            continue
+        agent_filters = []
+        ignore_agent_filters = agent is None and uuid is None
+        if (assigned or active or callbacks or completed or
+            pending and not ignore_agent_filters):
+            if uuid is None:
+                das = q.dispatchers.values()
+            else:
+                das = (q.dispatchers[uuid],)
+            for da in das:
+                for a_name, a in da.items():
+                    if agent:
+                        if not agent(a_name):
+                            continue
+                    if agent or uuid is not None:
+                        if pending and not ignore_agent_filters:
+                            if zc.async.interfaces.IFilterAgent.providedBy(a):
+                                agent_filters.append(a.filter)
+                                ignore_agent_filters = (
+                                    ignore_agent_filters or a.filter is None)
+                            else:
+                                raise ValueError(
+                                    'can only find pending jobs for agent if '
+                                    'agent provides '
+                                    'zc.async.interfaces.IFilterAgent '
+                                    '(%s : %s : %s)' %
+                                    (q_name, da.UUID, a_name))
+                    if agent_states:
+                        for j in a:
+                            if j.status not in agent_states:
+                                continue
+                            for f in filters:
+                                if not f(j):
+                                    break
+                            else:
+                                yield j
+                    if completed:
+                        for j in a.completed:
+                            if completed!='completed':
+                                is_failure = isinstance(
+                                    j.result, twisted.python.failure.Failure)
+                                if (completed=='succeeded' and is_failure or
+                                    completed=='failed' and not is_failure):
+                                    continue
+                            for f in filters:
+                                if not f(j):
+                                    break
+                            else:
+                                yield j
+        if pending:
+            if not agent or agent_filters:
+                for j in q:
+                    if not ignore_agent_filters:
+                        for f in agent_filters:
+                            if f(j):
+                                break # this is a positive match
+                        else:
+                            continue
+                    for f in filters:
+                        if not f(j):
+                            break # this is a negative match
+                    else:
+                        yield j
+
+def jobs(context, *states, **kwargs):
+    """Return jobs in one or more states."""
+    return _jobs(context, states, **kwargs)
+
+def count(context, *states, **kwargs):
+    """Count jobs in one or more states."""
+    res = 0
+    for j in _jobs(context, states, **kwargs):
+        res += 1
+    return res
+
+_status_keys = {
+    zc.async.interfaces.NEW: 'new',
+    zc.async.interfaces.PENDING: 'pending',
+    zc.async.interfaces.ASSIGNED: 'assigned',
+    zc.async.interfaces.ACTIVE: 'active',
+    zc.async.interfaces.CALLBACKS: 'callbacks',
+    zc.async.interfaces.COMPLETED: 'completed'}
+
+def jobstats(context, *states, **kwargs):
+    """Return statistics about jobs in one or more states."""
+    now = datetime.datetime.now(pytz.UTC)
+    d = {'pending': 0, 'assigned': 0, 'active': 0, 'callbacks': 0,
+         'succeeded': 0, 'failed': 0}
+    longest_wait = longest_active = None
+    shortest_wait = shortest_active = None
+    for j in _jobs(context, states, **kwargs):
+        status = j.status 
+        if status == zc.async.interfaces.COMPLETED:
+            if isinstance(j.result, twisted.python.failure.Failure):
+                d['failed'] += 1
+            else:
+                d['succeeded'] += 1
+        else:
+            d[_status_keys[status]] += 1
+        wait = active = None
+        if j.active_start:
+            if j.active_end:
+                active = j.active_end - j.active_start
+            else:
+                active = now - j.active_start
+            if (longest_active is None or
+                longest_active[0] < active):
+                longest_active = active, j
+            if (shortest_active is None or
+                shortest_active[0] < active):
+                shortest_active = active, j
+            wait = j.active_start - j.begin_after
+        else:
+            wait = now - j.begin_after
+        if (longest_wait is None or
+            longest_wait[0] < wait):
+            longest_wait = wait, j
+        if (shortest_wait is None or
+            shortest_wait[0] < wait):
+            shortest_wait = wait, j
+    d['longest wait'] = longest_wait
+    d['longest active'] = longest_active
+    d['shortest wait'] = shortest_wait
+    d['shortest active'] = shortest_active
+    return d
+
+def jobsummary(job):
+    now = datetime.datetime.now(pytz.UTC)
+    wait = active = None
+    if job.active_start:
+        if job.active_end:
+            active = job.active_end - job.active_start
+        else:
+            active = now - job.active_start
+        wait = job.active_start - job.begin_after
+    else:
+        wait = now - job.begin_after
+    if isinstance(job.result, twisted.python.failure.Failure):
+        failed = True
+        result = job.result.getBriefTraceback()
+    else:
+        failed = False
+        result = zc.async.utils.custom_repr(job.result)
+    a = job.agent
+    if a:
+        agent = job.agent.name
+        dispatcher = a.parent.UUID
+    else:
+        agent = dispatcher = None
+    q = job.queue
+    if q:
+        queue = q.name
+    else:
+        queue = None
+    return {'repr': repr(job),
+            'args': list(job.args),
+            'kwargs': dict(job.kwargs),
+            'begin after': job.begin_after,
+            'active start': job.active_start,
+            'active end': job.active_end,
+            'initial callbacks end': job.initial_callbacks_end,
+            'wait': wait,
+            'active': active,
+            'status': _status_keys[job.status],
+            'failed': failed,
+            'result': result,
+            'quota names': job.quota_names,
+            'agent': agent,
+            'dispatcher': dispatcher,
+            'queue': queue,
+            'callbacks': list(job.callbacks)}
+
+def _get_job(context, oid, database=None):
+    conn = ZODB.interfaces.IConnection(context)
+    if database is None:
+        local_conn = conn
+    else:
+        local_conn = conn.get_connection(database)
+    return local_conn.get(ZODB.utils.p64(int(oid)))
+
+def traceback(context, oid, database=None, detail='default'):
+    """Return the traceback for the job identified by integer oid."""
+    detail = detail.lower()
+    if detail not in ('brief', 'default', 'verbose'):
+        raise ValueError('detail must be one of "brief," "default," "verbose"')
+    job = _get_job(context, oid, database)
+    if not isinstance(job.result, twisted.python.failure.Failure):
+        return None
+    return job.result.getTraceback(detail=detail)
+
+def job(context, oid, database=None):
+    """Return summary of job identified by integer oid."""
+    return jobsummary(_get_job(context, oid, database))
+
+def firstjob(context, *states, **kwargs):
+    """Return summary of first job found matching given filters.
+    """
+    for j in _jobs(context, states, **kwargs):
+        return jobsummary(j)
+    return None
+
+def UUIDs(context):
+    """Return all active UUIDs."""
+    conn = ZODB.interfaces.IConnection(context)
+    queues = conn.root()[zc.async.interfaces.KEY]
+    if not len(queues):
+        return []
+    queue = iter(queues.values()).next()
+    return [str(UUID) for UUID, da in queue.dispatchers.items()
+            if da.activated]
+
+def status(context, queue=None, agent=None, uuid=None):
+    """Return status of the agents of all queues and all active UUIDs."""
+    conn = ZODB.interfaces.IConnection(context)
+    if uuid is not None:
+        if uuid.upper() == 'THIS':
+            uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
+        else:
+            uuid = UUID(uuid)
+    if queue is not None:
+        queue = re.compile(fnmatch.translate(queue)).match
+    if agent is not None:
+        agent = re.compile(fnmatch.translate(agent)).match
+    queues = conn.root()[zc.async.interfaces.KEY]
+    res ={}
+    if not len(queues):
+        return res
+    for q_name, q in queues.items():
+        if queue is None or queue(q_name):
+            das = {}
+            res[q_name] = {'len': len(q), 'dispatchers': das}
+            for da_uuid, da in q.dispatchers.items():
+                if da.activated and (uuid is None or da_uuid == uuid):
+                    agents = {}
+                    das[str(da_uuid)] = da_data = {
+                        'last ping': da.last_ping.value,
+                        'since ping': (datetime.datetime.now(pytz.UTC) -
+                                       da.last_ping.value),
+                        'dead': da.dead,
+                        'ping interval': da.ping_interval,
+                        'ping death interval': da.ping_death_interval,
+                        'agents': agents
+                        }
+                    for a_name, a in da.items():
+                        if agent is None or agent(a_name):
+                            agents[a_name] = d = {
+                                'size': a.size,
+                                'len': len(a)
+                                }
+                            if zc.async.interfaces.IFilterAgent.providedBy(a):
+                                d['filter'] = a.filter
+                            else:
+                                d['chooser'] = a.chooser
+    return res
+
+funcs = {}
+
+def help(context, cmd=None):
+    """Get help on an asyncdb monitor tool.
+
+    Usage is 'asyncdb help <tool name>' or 'asyncdb help'."""
+    if cmd is None:
+        res = [
+            "These are the tools available.  Usage for each tool is \n"
+            "'asyncdb <tool name> [modifiers...]'.  Learn more about each \n"
+            "tool using 'asyncdb help <tool name>'.\n"]
+        for nm, func in sorted(funcs.items()):
+            res.append('%s: %s' % (
+                nm, func.__doc__.split('\n', 1)[0]))
+        return '\n'.join(res)
+    f = funcs.get(cmd)
+    if f is None:
+        return 'Unknown async tool'
+    return f.__doc__
+
+for f in (
+    count, jobs, job, firstjob, traceback, jobstats, UUIDs, status, help):
+    name = f.__name__
+    funcs[name] = f
+
+def asyncdb(connection, cmd=None, *raw):
+    """Monitor and introspect zc.async activity in the database.
+
+    To see a list of asyncdb tools, use 'asyncdb help'.
+
+    To learn more about an asyncdb tool, use 'asyncdb help <tool name>'.
+
+    ``asyncdb`` tools differ from ``async`` tools in that ``asyncdb`` tools
+    access the database, and ``async`` tools do not."""
+    zc.async.monitor.monitor(
+        funcs, asyncdb.__doc__, connection, cmd, raw, needs_db_connection=True)

Added: zc.async/trunk/src/zc/async/monitordb.txt
===================================================================
--- zc.async/trunk/src/zc/async/monitordb.txt	                        (rev 0)
+++ zc.async/trunk/src/zc/async/monitordb.txt	2008-09-11 03:58:09 UTC (rev 91050)
@@ -0,0 +1,1197 @@
+Monitoring and Introspecting zc.async Database State
+====================================================
+
+The zc.async database activity can be monitored and introspected via
+zc.z3monitor plugins.  Let's imagine we have a connection over which we can
+send text messages to the monitor server [#setUp]_.
+
+All monitoring is done through the ``asyncdb`` command.  Here is its
+description, using the zc.z3monitor ``help`` command.
+
+    >>> connection.test_input('help asyncdb\n')
+    Help for asyncdb:
+    <BLANKLINE>
+    Monitor and introspect zc.async activity in the database.
+    <BLANKLINE>
+        To see a list of asyncdb tools, use 'asyncdb help'.
+    <BLANKLINE>
+        To learn more about an asyncdb tool, use 'asyncdb help <tool name>'.
+    <BLANKLINE>
+        ``asyncdb`` tools differ from ``async`` tools in that ``asyncdb`` tools
+        access the database, and ``async`` tools do not.
+    -> CLOSE
+
+As you can see, you use ``asyncdb help`` to get more information about each
+async-specific command.
+
+    >>> connection.test_input('asyncdb help\n')
+    These are the tools available.  Usage for each tool is 
+    'asyncdb <tool name> [modifiers...]'.  Learn more about each 
+    tool using 'asyncdb help <tool name>'.
+    <BLANKLINE>
+    UUIDs: Return all active UUIDs.
+    count: Count jobs in one or more states.
+    firstjob: Return summary of first job found matching given filters.
+    help: Get help on an asyncdb monitor tool.
+    job: Return summary of job identified by integer oid.
+    jobs: Return jobs in one or more states.
+    jobstats: Return statistics about jobs in one or more states.
+    status: Return status of the agents of all queues and all active UUIDs.
+    traceback: Return the traceback for the job identified by integer oid. 
+    -> CLOSE
+
+Let's give a quick run through these for an overview, and then we'll dig in
+just a bit.
+
+The ``UUIDs`` command is the simplest.  It returns all active UUIDs.
+
+    >>> connection.test_input('asyncdb UUIDs\n')
+    [
+        "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd"
+    ] 
+    -> CLOSE
+
+The ``status`` is also simple, but very informative.  By default, it lists
+information for all queues.  For each queue, it lists the number of pending
+jobs and the active dispatchers; for each dispatcher, it lists status
+information, and each agent; for each agent, it lists the filter or chooser,
+the number of active jobs, and the size of the agent.
+
+    >>> connection.test_input('asyncdb status\n')
+    {
+        "": {
+            "dispatchers": {
+                "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd": {
+                    "agents": {
+                        "main": {
+                            "filter": null, 
+                            "len": 0, 
+                            "size": 3
+                        }
+                    }, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:22.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 1.0
+                    }
+                }
+            }, 
+            "len": 0
+        }
+    } 
+    -> CLOSE
+
+The rest of the commands have to do with jobs, so we will add some.
+
+    >>> import zc.async.job
+    >>> import threading
+    >>> active_lock = threading.Lock()
+    >>> active_lock.acquire()
+    True
+    >>> callback_lock = threading.Lock()
+    >>> callback_lock.acquire()
+    True
+    >>> active_lock2 = threading.Lock()
+    >>> active_lock2.acquire()
+    True
+    >>> def raise_exception():
+    ...     raise RuntimeError('kumquat!')
+    ...
+    >>> def active_pause():
+    ...     active_lock.acquire()
+    ...     return 42
+    ...
+    >>> def active_pause2():
+    ...     active_lock2.acquire()
+    ...     return 42
+    ...
+    >>> def callback_pause(result):
+    ...     callback_lock.acquire()
+    ...     return repr(result)
+    ...
+    >>> def send_message():
+    ...     return "imagine this sent a message to another machine"
+    ...
+    >>> def sum_silly(*args, **kwargs):
+    ...     return sum(args, kwargs.get('start', 0))
+    ...
+    >>> job1 = queue.put(raise_exception)
+    >>> job2 = queue.put(active_pause)
+    >>> job3 = queue.put(zc.async.job.Job(sum_silly, 18, 18, start=6))
+    >>> job3_callback = job3.addCallback(callback_pause)
+    >>> job4 = queue.put(active_pause2)
+    >>> job5 = queue.put(send_message)
+    >>> job6 = queue.put(zc.async.job.Job(sum, (18, 18, 6)))
+    >>> import transaction
+    >>> transaction.commit()
+
+Notice the change in the queue's len:
+
+    >>> connection.test_input('asyncdb status\n')
+    {
+        "": {
+            "dispatchers": {
+                "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd": {
+                    "agents": {
+                        "main": {
+                            "filter": null, 
+                            "len": 0, 
+                            "size": 3
+                        }
+                    }, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:22.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 1.0
+                    }
+                }
+            }, 
+            "len": 6
+        }
+    } 
+    -> CLOSE
+
+The ``jobs`` command lists all of the jobs in one or more states, identifying
+them with integer OIDs and database names.
+
+    >>> connection.test_input('asyncdb jobs pending\n') # doctest: +ELLIPSIS
+    [
+        [
+            30, 
+            "unnamed"
+        ], 
+        [
+            31, 
+            "unnamed"
+        ], 
+        [
+            32, 
+            "unnamed"
+        ], 
+        [
+            33, 
+            "unnamed"
+        ], 
+        [
+            34, 
+            "unnamed"
+        ], 
+        [
+            35, 
+            "unnamed"
+        ]
+    ] 
+    -> CLOSE
+
+You can also count and get stats for these, with the same arguments.
+
+    >>> connection.test_input('asyncdb count pending\n')
+    6 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb jobstats pending\n')
+    {
+        "active": 0, 
+        "assigned": 0, 
+        "callbacks": 0, 
+        "failed": 0, 
+        "longest active": null, 
+        "longest wait": [
+            {
+                "seconds": 0.0
+            }, 
+            [
+                30, 
+                "unnamed"
+            ]
+        ], 
+        "pending": 6, 
+        "shortest active": null, 
+        "shortest wait": [
+            {
+                "seconds": 0.0
+            }, 
+            [
+                30, 
+                "unnamed"
+            ]
+        ], 
+        "succeeded": 0
+    } 
+    -> CLOSE
+
+We can look at these jobs.
+
+    >>> connection.test_input('asyncdb job 30\n')
+    {
+        "active": null, 
+        "active end": null, 
+        "active start": null, 
+        "agent": null, 
+        "args": [], 
+        "begin after": "2006-08-10T15:44:23.000211Z", 
+        "callbacks": [], 
+        "dispatcher": null, 
+        "failed": false, 
+        "initial callbacks end": null, 
+        "kwargs": {}, 
+        "queue": "", 
+        "quota names": [], 
+        "repr": "<zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.raise_exception()``>", 
+        "result": "None", 
+        "status": "pending", 
+        "wait": {
+            "seconds": 0.0
+        }
+    } 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb job 32\n')
+    {
+        "active": null, 
+        "active end": null, 
+        "active start": null, 
+        "agent": null, 
+        "args": [
+            18, 
+            18
+        ], 
+        "begin after": "2006-08-10T15:44:23.000211Z", 
+        "callbacks": [
+            [
+                49, 
+                "unnamed"
+            ]
+        ], 
+        "dispatcher": null, 
+        "failed": false, 
+        "initial callbacks end": null, 
+        "kwargs": {
+            "start": 6
+        }, 
+        "queue": "", 
+        "quota names": [], 
+        "repr": "<zc.async.job.Job (oid 32, db 'unnamed') ``zc.async.doctest_test.sum_silly(18, 18, start=6)``>", 
+        "result": "None", 
+        "status": "pending", 
+        "wait": {
+            "seconds": 0.0
+        }
+    } 
+    -> CLOSE
+
+Specifying the database name is equivalent (note that, confusingly, this
+database's name is "unnamed").
+
+    >>> connection.test_input('asyncdb job 32 unnamed\n')
+    {
+        "active": null, 
+        "active end": null, 
+        "active start": null, 
+        "agent": null, 
+        "args": [
+            18, 
+            18
+        ], 
+        "begin after": "2006-08-10T15:44:23.000211Z", 
+        "callbacks": [
+            [
+                49, 
+                "unnamed"
+            ]
+        ], 
+        "dispatcher": null, 
+        "failed": false, 
+        "initial callbacks end": null, 
+        "kwargs": {
+            "start": 6
+        }, 
+        "queue": "", 
+        "quota names": [], 
+        "repr": "<zc.async.job.Job (oid 32, db 'unnamed') ``zc.async.doctest_test.sum_silly(18, 18, start=6)``>", 
+        "result": "None", 
+        "status": "pending", 
+        "wait": {
+            "seconds": 0.0
+        }
+    } 
+    -> CLOSE
+
+We can also use ``firstjob`` to look at the first job in a given state.
+
+    >>> connection.test_input('asyncdb firstjob pending\n')
+    {
+        "active": null, 
+        "active end": null, 
+        "active start": null, 
+        "agent": null, 
+        "args": [], 
+        "begin after": "2006-08-10T15:44:23.000211Z", 
+        "callbacks": [], 
+        "dispatcher": null, 
+        "failed": false, 
+        "initial callbacks end": null, 
+        "kwargs": {}, 
+        "queue": "", 
+        "quota names": [], 
+        "repr": "<zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.raise_exception()``>", 
+        "result": "None", 
+        "status": "pending", 
+        "wait": {
+            "seconds": 0.0
+        }
+    } 
+    -> CLOSE
+
+The ``jobs``, ``count``, ``jobstats`` and ``firstjob`` commands all support the
+same filtering options, which allow you to filter against a certain dispatcher,
+a certain agent, or a string for the callable, or for various times.  Here's an
+example of filtering by callable name.
+
+    >>> connection.test_input('asyncdb count pending callable:send_message\n')
+    1 
+    -> CLOSE
+
+This can accept glob-style wildcards.
+
+    >>> connection.test_input('asyncdb count pending callable:su*\n')
+    2 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb count pending callable:s*\n')
+    3 
+    -> CLOSE
+
+If you do not include a ".", it matches only on the callable name.  If you
+include a ".", it matches on the fully-qualified name (that is, including the
+module).
+
+    >>> connection.test_input('asyncdb count pending callable:*test.send*\n')
+    1 
+    -> CLOSE
+    >>> connection.test_input('asyncdb count pending callable:__builtin__.*\n')
+    1 
+    -> CLOSE
+
+Currently we don't have any jobs in any other state, other than pending.
+Let's change that.
+
+    >>> connection.test_input(
+    ...     'asyncdb count assigned active callbacks completed\n')
+    0 
+    -> CLOSE
+    >>> ignore = reactor.time_flies(dispatcher.poll_interval)
+    >>> import time
+    >>> time.sleep(0.2)
+    >>> connection.test_input(
+    ...     'asyncdb count assigned active callbacks completed\n')
+    3 
+    -> CLOSE
+
+Now, because of our locks, we have one job in "active", one in "callbacks",
+and one in "completed".
+
+    >>> connection.test_input(
+    ...     'asyncdb count pending\n')
+    3 
+    -> CLOSE
+    >>> connection.test_input(
+    ...     'asyncdb count assigned\n')
+    0 
+    -> CLOSE
+    >>> connection.test_input(
+    ...     'asyncdb count active\n')
+    1 
+    -> CLOSE
+    >>> connection.test_input(
+    ...     'asyncdb count callbacks\n')
+    1 
+    -> CLOSE
+    >>> connection.test_input(
+    ...     'asyncdb count completed\n')
+    1 
+    -> CLOSE
+
+The "completed" category is divided into two synthetic states, "failed" and
+"succeeded".  The one completed job raised an exception, so it is "failed".
+
+    >>> connection.test_input(
+    ...     'asyncdb count failed\n')
+    1 
+    -> CLOSE
+    >>> connection.test_input(
+    ...     'asyncdb count succeeded\n')
+    0 
+    -> CLOSE
+
+Note that you cannot combine "completed," "succeeded," or "failed."
+
+    >>> connection.test_input(
+    ...     'asyncdb count succeeded failed\n') # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    ValueError: can only include zero or one of "completed", "succeeded," or "failed" states.
+    <BLANKLINE>
+    -> CLOSE
+
+
+    >>> connection.test_input(
+    ...     'asyncdb count failed completed\n') # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    ValueError: can only include zero or one of "completed", "succeeded," or "failed" states.
+    <BLANKLINE>
+    -> CLOSE
+
+    >>> connection.test_input(
+    ...     'asyncdb count succeeded completed\n') # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    ValueError: can only include zero or one of "completed", "succeeded," or "failed" states.
+    <BLANKLINE>
+    -> CLOSE
+
+    >>> connection.test_input(
+    ...     'asyncdb count succeeded completed failed\n') # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    ValueError: can only include zero or one of "completed", "succeeded," or "failed" states.
+    <BLANKLINE>
+    -> CLOSE
+
+The last job failed, so we can get a traceback for it.  This is the last
+command that asyncdb provides.
+
+    >>> connection.test_input('asyncdb traceback 30\n') # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+      File "...job.py", line ..., in __call__
+        ...
+      File "<doctest monitordb.txt...>", line ..., in raise_exception
+        raise RuntimeError('kumquat!')
+    exceptions.RuntimeError: kumquat!
+    <BLANKLINE>
+    -> CLOSE
+
+You can get a brief or a verbose version of the traceback.
+
+    >>> connection.test_input(
+    ...     'asyncdb traceback 30 detail:brief\n') # doctest: +ELLIPSIS
+    Traceback: exceptions.RuntimeError: kumquat!
+    ...job.py:...:__call__
+    <doctest monitordb.txt...>:...:raise_exception
+    <BLANKLINE>
+    -> CLOSE
+
+    >>> connection.test_input(
+    ...     'asyncdb traceback 30 detail:verbose\n') # doctest: +ELLIPSIS
+    *--- Failure #... (pickled) ---
+    ...job.py:...: __call__(...)
+     [ Locals ]
+      identifier : '"preparing for call o[...]'
+      prepare : '<function prepare at [...]'
+      res : 'None'
+      self : '<zc.async.job.Job (oi[...]'
+      args : '()'
+      tm : '<transaction._manager[...]'
+      kwargs : '{}'
+      setup_info : 'None'
+      effective_args : '[]'
+      data_cache : '{}'
+      statuses : "(u'new-status', u'ass[...]"
+      effective_kwargs : '{}'
+     ( Globals )
+    <doctest monitordb.txt...>:...: raise_exception(...)
+     [ Locals ]
+     ( Globals )
+    exceptions.RuntimeError: kumquat!
+    *--- End of Failure #... ---
+    <BLANKLINE>
+    -> CLOSE
+
+
+The brief version is included in the job summary view (see the "result" key).
+
+    >>> connection.test_input('asyncdb job 30\n') # doctest: +ELLIPSIS
+    {
+        "active": {
+            "seconds": 0.0
+        }, 
+        "active end": "2006-08-10T15:44:27.000211Z", 
+        "active start": "2006-08-10T15:44:27.000211Z", 
+        "agent": "main", 
+        "args": [], 
+        "begin after": "2006-08-10T15:44:23.000211Z", 
+        "callbacks": [], 
+        "dispatcher": "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd", 
+        "failed": true, 
+        "initial callbacks end": "2006-08-10T15:44:27.000211Z", 
+        "kwargs": {}, 
+        "queue": "", 
+        "quota names": [], 
+        "repr": "<zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.raise_exception()``>", 
+        "result": "Traceback: exceptions.RuntimeError: kumquat!\n...job.py:...:__call__\n<doctest monitordb.txt...>:...:raise_exception\n", 
+        "status": "completed", 
+        "wait": {
+            "seconds": 4.0
+        }
+    } 
+    -> CLOSE
+
+Now, let's look at some of the date-based filters: ``requested_start``,
+``start``, ``end``, and ``callbacks_completed``.  They filter
+the results of the ``jobs``, ``count``, ``jobstats`` and ``firstjob`` tools.
+
+Each may be of the form "sinceINTERVAL", "beforeINTERVAL", or
+"sinceINTERVAL,beforeINTERVAL".  Intervals are of the format
+``[nD][nH][nM][nS]``, where "n" should be replaced with a positive integer, and
+"D," "H," "M," and "S" are literals standing for "days," "hours," "minutes,"
+and "seconds." For instance, you might use ``5M`` for five minutes, ``20S`` for
+twenty seconds, or ``1H30M`` for an hour and a half.
+
+For easier reading, it may help to insert "ago" after each interval.
+
+Here are some examples.
+
+* If you used "start:since5s" then that could be read as "jobs that
+  started five seconds ago or sooner."  
+
+* "requested_start:before1M" could be read as "jobs that were supposed to begin
+  one minute ago or longer". 
+
+* "end:since1M,before30S" could be read as "jobs that ended their
+  primary work (that is, not including callbacks) between thirty seconds and
+  one minute ago."
+
+* "callbacks_completed:before30S,since1M" could be read as "jobs that
+  completed the callbacks they had when first run between thirty seconds and
+  one minute ago."  (This also shows that the order of "before" and "since" do
+  not matter.)
+
+Let's let some more time pass, and then use some of these.
+
+    >>> ignore = reactor.time_flies(dispatcher.poll_interval)
+    >>> time.sleep(0.2)
+
+    >>> connection.test_input('asyncdb status\n')
+    {
+        "": {
+            "dispatchers": {
+                "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd": {
+                    "agents": {
+                        "main": {
+                            "filter": null, 
+                            "len": 3, 
+                            "size": 3
+                        }
+                    }, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:22.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 11.0
+                    }
+                }
+            }, 
+            "len": 2
+        }
+    } 
+    -> CLOSE
+
+How many jobs are active?
+
+    >>> connection.test_input('asyncdb count active\n')
+    2 
+    -> CLOSE
+
+So, what are the active jobs that have been working for more than two seconds?
+
+    >>> connection.test_input('asyncdb jobs active start:before2S\n')
+    [
+        [
+            31, 
+            "unnamed"
+        ]
+    ] 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb job 31\n')
+    {
+        "active": {
+            "seconds": 6.0
+        }, 
+        "active end": null, 
+        "active start": "2006-08-10T15:44:27.000211Z", 
+        "agent": "main", 
+        "args": [], 
+        "begin after": "2006-08-10T15:44:23.000211Z", 
+        "callbacks": [], 
+        "dispatcher": "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd", 
+        "failed": false, 
+        "initial callbacks end": null, 
+        "kwargs": {}, 
+        "queue": "", 
+        "quota names": [], 
+        "repr": "<zc.async.job.Job (oid 31, db 'unnamed') ``zc.async.doctest_test.active_pause()``>", 
+        "result": "None", 
+        "status": "active", 
+        "wait": {
+            "seconds": 4.0
+        }
+    } 
+    -> CLOSE
+
+And what are the active jobs that have been working for less than two seconds?
+
+    >>> connection.test_input('asyncdb jobs active start:since2S\n')
+    [
+        [
+            33, 
+            "unnamed"
+        ]
+    ] 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb job 33\n')
+    {
+        "active": {
+            "seconds": 1.0
+        }, 
+        "active end": null, 
+        "active start": "2006-08-10T15:44:32.000211Z", 
+        "agent": "main", 
+        "args": [], 
+        "begin after": "2006-08-10T15:44:23.000211Z", 
+        "callbacks": [], 
+        "dispatcher": "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd", 
+        "failed": false, 
+        "initial callbacks end": null, 
+        "kwargs": {}, 
+        "queue": "", 
+        "quota names": [], 
+        "repr": "<zc.async.job.Job (oid 33, db 'unnamed') ``zc.async.doctest_test.active_pause2()``>", 
+        "result": "None", 
+        "status": "active", 
+        "wait": {
+            "seconds": 9.0
+        }
+    } 
+    -> CLOSE
+
+Let's let some more time pass, and then let the new job complete.
+
+    >>> ignore = reactor.time_flies(dispatcher.poll_interval)
+    >>> active_lock2.release()
+    >>> time.sleep(0.2)
+
+Now, how many jobs have completed?
+
+    >>> connection.test_input('asyncdb count completed\n')
+    2 
+    -> CLOSE
+
+What's the first (most recent) job that completed?  (Note that this information
+about completed jobs rotates out periodically from the database, so, by
+default, you only have about seven days worth of completed jobs).
+
+    >>> connection.test_input('asyncdb firstjob completed\n')
+    {
+        "active": {
+            "seconds": 6.0
+        }, 
+        "active end": "2006-08-10T15:44:38.000211Z", 
+        "active start": "2006-08-10T15:44:32.000211Z", 
+        "agent": "main", 
+        "args": [], 
+        "begin after": "2006-08-10T15:44:23.000211Z", 
+        "callbacks": [], 
+        "dispatcher": "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd", 
+        "failed": false, 
+        "initial callbacks end": "2006-08-10T15:44:38.000211Z", 
+        "kwargs": {}, 
+        "queue": "", 
+        "quota names": [], 
+        "repr": "<zc.async.job.Job (oid 33, db 'unnamed') ``zc.async.doctest_test.active_pause2()``>", 
+        "result": "42", 
+        "status": "completed", 
+        "wait": {
+            "seconds": 9.0
+        }
+    } 
+    -> CLOSE
+
+What's the first completed job that ended more than 5 seconds ago?
+
+    >>> connection.test_input('asyncdb firstjob completed end:before5S\n')
+    ... # doctest: +ELLIPSIS
+    {
+        "active": {
+            "seconds": 0.0
+        }, 
+        "active end": "2006-08-10T15:44:27.000211Z", 
+        "active start": "2006-08-10T15:44:27.000211Z", 
+        "agent": "main", 
+        "args": [], 
+        "begin after": "2006-08-10T15:44:23.000211Z", 
+        "callbacks": [], 
+        "dispatcher": "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd", 
+        "failed": true, 
+        "initial callbacks end": "2006-08-10T15:44:27.000211Z", 
+        "kwargs": {}, 
+        "queue": "", 
+        "quota names": [], 
+        "repr": "<zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.raise_exception()``>", 
+        "result": "Traceback: exceptions.RuntimeError: kumquat!\n...job.py:...:__call__\n<doctest monitordb.txt...>:...:raise_exception\n", 
+        "status": "completed", 
+        "wait": {
+            "seconds": 4.0
+        }
+    } 
+    -> CLOSE
+
+The last things to show are filtering by dispatcher, agent, or queue. We'll
+make the scenario a bit more complex to test some of the other features.  To
+demonstrate, we'll need to add another dispatcher, with a custom agent with a
+filter; and add another agent with a chooser to the first dispatcher.
+
+    >>> import uuid
+    >>> uuid2 = uuid.UUID('282b5a6c-5a84-11dd-a9af-0017f2c49bdd')
+    >>> alt_dispatcher = zc.async.dispatcher.Dispatcher(db, reactor, uuid=uuid2)
+    >>> alt_dispatcher.activate()
+    >>> ignore = reactor.time_flies(1)
+
+Now we have a new installed agent.
+
+    >>> connection.test_input('asyncdb UUIDs\n')
+    [
+        "282b5a6c-5a84-11dd-a9af-0017f2c49bdd", 
+        "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd"
+    ] 
+    -> CLOSE
+
+It doesn't have any agents yet, though, so it hasn't taken any jobs.
+
+    >>> connection.test_input('asyncdb status\n')
+    {
+        "": {
+            "dispatchers": {
+                "282b5a6c-5a84-11dd-a9af-0017f2c49bdd": {
+                    "agents": {}, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:38.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 1.0
+                    }
+                }, 
+                "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd": {
+                    "agents": {
+                        "main": {
+                            "filter": null, 
+                            "len": 2, 
+                            "size": 3
+                        }
+                    }, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:22.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 17.0
+                    }
+                }
+            }, 
+            "len": 2
+        }
+    } 
+    -> CLOSE
+
+The majority of the asyncdb commands let you filter by uuid.  You can either
+provide the uuid string, or "THIS" if you want to look at this process's uuid.
+
+    >>> connection.test_input('asyncdb status uuid:THIS\n')
+    {
+        "": {
+            "dispatchers": {
+                "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd": {
+                    "agents": {
+                        "main": {
+                            "filter": null, 
+                            "len": 2, 
+                            "size": 3
+                        }
+                    }, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:22.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 17.0
+                    }
+                }
+            }, 
+            "len": 2
+        }
+    } 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb status uuid:282b5a6c-5a84-11dd-a9af-0017f2c49bdd\n')
+    {
+        "": {
+            "dispatchers": {
+                "282b5a6c-5a84-11dd-a9af-0017f2c49bdd": {
+                    "agents": {}, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:38.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 1.0
+                    }
+                }
+            }, 
+            "len": 2
+        }
+    } 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb count active callbacks completed uuid:THIS\n')
+    4 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb count active callbacks completed uuid:282b5a6c-5a84-11dd-a9af-0017f2c49bdd\n')
+    0 
+    -> CLOSE
+
+Now we'll add an agent for this dispatcher.  It uses a filter that won't accept
+any of the pending jobs--it will only accept the ``active_pause2`` function
+created above.
+
+    >>> import zc.async.agent
+    >>> import zc.async.utils
+    >>> def filter(j):
+    ...     return (zc.async.utils.custom_repr(j.callable) ==
+    ...             'zc.async.doctest_test.active_pause2')
+    >>> agent = zc.async.agent.Agent(filter=filter)
+    >>> queue.dispatchers[alt_dispatcher.UUID]['filtered'] = agent
+    >>> transaction.commit()
+    >>> ignore = reactor.time_flies(1)
+
+The agent is now reported in the status.  Still no jobs in the agent.
+
+    >>> connection.test_input('asyncdb status\n')
+    {
+        "": {
+            "dispatchers": {
+                "282b5a6c-5a84-11dd-a9af-0017f2c49bdd": {
+                    "agents": {
+                        "filtered": {
+                            "filter": "zc.async.doctest_test.filter", 
+                            "len": 0, 
+                            "size": 3
+                        }
+                    }, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:38.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 2.0
+                    }
+                }, 
+                "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd": {
+                    "agents": {
+                        "main": {
+                            "filter": null, 
+                            "len": 2, 
+                            "size": 3
+                        }
+                    }, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:22.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 18.0
+                    }
+                }
+            }, 
+            "len": 2
+        }
+    } 
+    -> CLOSE
+
+We can filter by agent.
+
+    >>> connection.test_input('asyncdb count active callbacks agent:main\n')
+    2 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb count active callbacks agent:filtered\n')
+    0 
+    -> CLOSE
+
+We can also filter by agent for *pending* jobs.  This returns jobs that the
+agent would accept, according to its filter.
+
+    >>> connection.test_input('asyncdb count pending agent:main\n')
+    2 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb count pending agent:filtered\n')
+    0 
+    -> CLOSE
+
+For instance, here's a job that the filtered agent is willing to do.
+
+    >>> job7 = queue.put(active_pause2)
+    >>> transaction.commit()
+
+    >>> connection.test_input('asyncdb count pending agent:filtered\n')
+    1 
+    -> CLOSE
+
+That will work for the dispatcher, by uuid, as well.  Specifying the uuid alone
+is like specifying all of the uuid's agents.
+
+    >>> connection.test_input('asyncdb count pending uuid:THIS\n')
+    3 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb count pending uuid:282b5a6c-5a84-11dd-a9af-0017f2c49bdd\n')
+    1 
+    -> CLOSE
+
+Finally, we will add another queue so we can show filtering by queue.
+
+    >>> alt_queue = mapping['alt'] = zc.async.queue.Queue()
+    >>> transaction.commit()
+
+Initially, before another poll, the dispatchers will not be activated in the
+new queue.
+
+
+
+    >>> connection.test_input('asyncdb status\n')
+    {
+        "": {
+            "dispatchers": {
+                "282b5a6c-5a84-11dd-a9af-0017f2c49bdd": {
+                    "agents": {
+                        "filtered": {
+                            "filter": "zc.async.doctest_test.filter", 
+                            "len": 0, 
+                            "size": 3
+                        }
+                    }, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:38.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 2.0
+                    }
+                }, 
+                "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd": {
+                    "agents": {
+                        "main": {
+                            "filter": null, 
+                            "len": 2, 
+                            "size": 3
+                        }
+                    }, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:22.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 18.0
+                    }
+                }
+            }, 
+            "len": 3
+        }, 
+        "alt": {
+            "dispatchers": {}, 
+            "len": 0
+        }
+    } 
+    -> CLOSE
+
+    >>> connection.test_input('asyncdb status queue:alt\n')
+    {
+        "alt": {
+            "dispatchers": {}, 
+            "len": 0
+        }
+    } 
+    -> CLOSE
+
+Let's get another poll in.  After that, we can see the dispatchers.
+
+    >>> ignore = reactor.time_flies(dispatcher.poll_interval-2)
+
+    >>> connection.test_input('asyncdb status queue:alt\n')
+    {
+        "alt": {
+            "dispatchers": {
+                "282b5a6c-5a84-11dd-a9af-0017f2c49bdd": {
+                    "agents": {}, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:43.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 0.0
+                    }
+                }, 
+                "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd": {
+                    "agents": {}, 
+                    "dead": false, 
+                    "last ping": "2006-08-10T15:44:42.000211Z", 
+                    "ping death interval": {
+                        "minutes": 1
+                    }, 
+                    "ping interval": {
+                        "seconds": 30.0
+                    }, 
+                    "since ping": {
+                        "seconds": 1.0
+                    }
+                }
+            }, 
+            "len": 0
+        }
+    } 
+    -> CLOSE
+
+
+We don't have any jobs in there, but if we did, they would be shown as usual.
+
+    >>> connection.test_input('asyncdb count pending queue:alt\n')
+    0 
+    -> CLOSE
+
+    >>> active_lock2.release()
+    >>> active_lock.release()
+    >>> callback_lock.release()
+    
+
+[#tearDown]_
+
+.. [#setUp] See the discussion in other documentation to explain this code.
+
+    >>> import ZODB.FileStorage
+    >>> storage = ZODB.FileStorage.FileStorage(
+    ...     'zc_async.fs', create=True)
+    >>> from ZODB.DB import DB 
+    >>> db = DB(storage) 
+    >>> conn = db.open()
+    >>> root = conn.root()
+
+    >>> import zc.async.configure
+    >>> zc.async.configure.base()
+
+    >>> import zc.async.testing
+    >>> reactor = zc.async.testing.Reactor()
+    >>> reactor.start() # this monkeypatches datetime.datetime.now 
+
+    >>> import zc.async.queue
+    >>> import zc.async.interfaces
+    >>> mapping = root[zc.async.interfaces.KEY] = zc.async.queue.Queues()
+    >>> queue = mapping[''] = zc.async.queue.Queue()
+    >>> import transaction
+    >>> transaction.commit()
+
+    >>> import zc.async.dispatcher
+    >>> dispatcher = zc.async.dispatcher.Dispatcher(db, reactor)
+    >>> dispatcher.activate()
+    >>> reactor.time_flies(1)
+    1
+
+    >>> import zc.async.agent
+    >>> agent = zc.async.agent.Agent()
+    >>> queue.dispatchers[dispatcher.UUID]['main'] = agent
+    >>> transaction.commit()
+
+    >>> import zc.ngi.testing
+    >>> import zc.z3monitor
+
+    >>> connection = zc.ngi.testing.TextConnection()
+    >>> server = zc.z3monitor.Server(connection)
+
+    >>> import zc.async.monitordb
+    >>> import zope.component
+    >>> import zc.z3monitor.interfaces
+    >>> zope.component.provideUtility(
+    ...     zc.async.monitordb.asyncdb,
+    ...     zc.z3monitor.interfaces.IZ3MonitorPlugin,
+    ...     'asyncdb')
+    >>> zope.component.provideUtility(zc.z3monitor.help,
+    ...     zc.z3monitor.interfaces.IZ3MonitorPlugin, 'help')
+
+.. [#tearDown]
+    >>> threads = []
+    >>> for d in (dispatcher, alt_dispatcher):
+    ...     for queue_pools in d.queues.values():
+    ...         for pool in queue_pools.values():
+    ...             threads.extend(pool.threads)
+    >>> reactor.stop()
+    >>> zc.async.testing.wait_for_deactivation(dispatcher)
+    >>> zc.async.testing.wait_for_deactivation(alt_dispatcher)
+    >>> for thread in threads:
+    ...     thread.join(3)
+    ...


Property changes on: zc.async/trunk/src/zc/async/monitordb.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: zc.async/trunk/src/zc/async/parallel_serial.txt
===================================================================
--- zc.async/trunk/src/zc/async/parallel_serial.txt	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/parallel_serial.txt	2008-09-11 03:58:09 UTC (rev 91050)
@@ -59,16 +59,17 @@
     ...         return job1.result + job2.result
     ...
     >>> def stop_dispatcher(dispatcher, lock=None):
+    ...     threads = []
+    ...     for queue_pools in dispatcher.queues.values():
+    ...         for pool in queue_pools.values():
+    ...             threads.extend(pool.threads)
     ...     dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
     ...     zc.async.testing.wait_for_deactivation(dispatcher)
     ...     dispatcher.thread.join(3)
     ...     if lock is not None:
     ...         lock.release()
-    ...     for queue_pools in dispatcher.queues.values():
-    ...         for name, pool in queue_pools.items():
-    ...             pool.setSize(0)
-    ...             for thread in pool.threads:
-    ...                 thread.join(3)
+    ...     for thread in threads:
+    ...         thread.join(3)
     ...
 
 First we'll test ``serial``.  We stop the worker once while it is working on

Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/testing.py	2008-09-11 03:58:09 UTC (rev 91050)
@@ -153,12 +153,18 @@
         end = _now + datetime.timedelta(seconds=seconds)
         ct = 0
         next = self._get_next(end)
+        then = None
         while next is not None:
             now, callable, args, kw = next
-            set_now(now)
+            if then is None or then != now:
+                time_sleep(0.5) # give threads a chance to work
+                set_now(now)
             callable(*args, **kw) # normally this would get try...except
             ct += 1
             next = self._get_next(end)
+            then = now
+        if ct:
+            time_sleep(0.5)
         set_now(end)
         return ct
 

Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/utils.py	2008-09-11 03:58:09 UTC (rev 91050)
@@ -15,10 +15,13 @@
 import logging
 import sys
 import time
+import types
 
+import persistent.interfaces
+import BTrees
 import ZEO.Exceptions
 import ZODB.POSException
-import BTrees
+import ZODB.utils
 import rwproperty
 import persistent
 import zope.minmax
@@ -328,3 +331,24 @@
             log.critical('Error while %s', identifier, exc_info=True)
             res = zc.twist.Failure()
         return res
+
+def custom_repr(obj):
+    if persistent.interfaces.IPersistent.providedBy(obj):
+        dbname = "?"
+        if obj._p_jar is not None:
+            dbname = getattr(obj._p_jar.db(), 'database_name', "?")
+            if dbname != '?':
+                dbname = repr(dbname)
+        if obj._p_oid is not None:
+            oid = ZODB.utils.u64(obj._p_oid)
+        else:
+            oid = '?'
+        return '%s.%s (oid %s, db %s)' % (
+            obj.__class__.__module__,
+            obj.__class__.__name__,
+            oid,
+            dbname)
+    elif isinstance(obj, (types.FunctionType, types.BuiltinFunctionType)):
+        return '%s.%s' % (obj.__module__, obj.__name__)
+    else:
+        return repr(obj)

Modified: zc.async/trunk/src/zc/async/z3tests.py
===================================================================
--- zc.async/trunk/src/zc/async/z3tests.py	2008-09-11 03:50:41 UTC (rev 91049)
+++ zc.async/trunk/src/zc/async/z3tests.py	2008-09-11 03:58:09 UTC (rev 91050)
@@ -39,6 +39,7 @@
     return unittest.TestSuite((
         doctest.DocFileSuite(
             'monitor.txt',
+            'monitordb.txt',
             'z3.txt',
             setUp=setUp, tearDown=zc.async.tests.modTearDown,
             optionflags=doctest.INTERPRET_FOOTNOTES),



More information about the Checkins mailing list