[Checkins] SVN: zc.async/branches/dev/ needs much more monitoring tests, but this is a checkpoint.

Gary Poster gary at zope.com
Tue Apr 1 23:11:16 EDT 2008


Log message for revision 85059:
  needs much more monitoring tests, but this is a checkpoint.

Changed:
  U   zc.async/branches/dev/buildout.cfg
  U   zc.async/branches/dev/setup.py
  U   zc.async/branches/dev/src/zc/async/README.txt
  U   zc.async/branches/dev/src/zc/async/dispatcher.py
  U   zc.async/branches/dev/src/zc/async/dispatcher.txt
  U   zc.async/branches/dev/src/zc/async/instanceuuid.py
  U   zc.async/branches/dev/src/zc/async/job.py
  U   zc.async/branches/dev/src/zc/async/job.txt
  A   zc.async/branches/dev/src/zc/async/monitor.py
  U   zc.async/branches/dev/src/zc/async/monitor.txt
  U   zc.async/branches/dev/src/zc/async/testing.py
  U   zc.async/branches/dev/src/zc/async/utils.py
  A   zc.async/branches/dev/src/zc/async/z3tests.py

-=-
Modified: zc.async/branches/dev/buildout.cfg
===================================================================
--- zc.async/branches/dev/buildout.cfg	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/buildout.cfg	2008-04-02 03:11:14 UTC (rev 85059)
@@ -2,6 +2,8 @@
 parts =
     interpreter
     test
+    z3interpreter
+    z3test
 
 develop = .
 
@@ -13,10 +15,21 @@
 [test]
 recipe = zc.recipe.testrunner
 eggs = zc.async
-defaults = '--tests-pattern [fn]?tests --exit-with-status -1'.split()
+defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1'.split()
 working-directory = ${buildout:directory}
 
+[z3test]
+recipe = zc.recipe.testrunner
+eggs = zc.async [z3]
+defaults = "--tests-pattern z3tests --exit-with-status -1".split()
+
+
 [interpreter]
 recipe = zc.recipe.egg
 eggs = zc.async
 interpreter = py
+
+[z3interpreter]
+recipe = zc.recipe.egg
+eggs = zc.async [z3]
+interpreter = z3py

Modified: zc.async/branches/dev/setup.py
===================================================================
--- zc.async/branches/dev/setup.py	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/setup.py	2008-04-02 03:11:14 UTC (rev 85059)
@@ -20,10 +20,8 @@
         'uuid',
         'zc.queue',
         'zc.dict>=1.2.1',
-        'zc.twist>=1.0.1',
-        'zc.twisted', # setup-friendly Twisted distro.  Someday soon we can
-        # discard zc.twisted, hopefully.  See
-        # http://twistedmatrix.com/trac/ticket/1286
+        'zc.twist>=1.1',
+        'Twisted>=8.0.1', # 8.0 was setuptools compatible
         'zope.bforest>=1.1',
         'zope.component',
         'zope.i18nmessageid',
@@ -31,5 +29,10 @@
         'zope.testing',
         'rwproperty',
         ],
+    extras_require={
+        'z3':[
+            'zc.z3monitor',
+            'simplejson',
+            ]},
     include_package_data=True,
     )

Modified: zc.async/branches/dev/src/zc/async/README.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README.txt	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/README.txt	2008-04-02 03:11:14 UTC (rev 85059)
@@ -530,14 +530,17 @@
     True
     >>> job.status == zc.async.interfaces.ACTIVE
     True
+
+[#stats_1]_
+
     >>> job.annotations['zc.async.test.flag'] = True
     >>> transaction.commit()
     >>> wait_for(job)
     >>> job.result
     42
 
-``getReactor`` and ``getDispatcher`` are for advanced use cases and are not
-explored further here.
+[#stats_2]_ ``getReactor`` and ``getDispatcher`` are for advanced use
+cases and are not explored further here.
 
 Job Quotas
 ==========
@@ -902,7 +905,7 @@
         the original requester.  The 2.x line addresses this with three
         changes:
 
-        + jobss are annotatable;
+        + jobs are annotatable;
 
         + jobs should not be modified in an asynchronous
           worker that does work (though they may be read);
@@ -1092,6 +1095,95 @@
     errors, do not request an annotation that might be a persistent
     object.*
 
+.. [#stats_1] The dispatcher has a getStatistics method.  It also shows the
+    fact that there is an active task.
+
+    >>> import pprint
+    >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
+    {'failed': 2,
+     'longest active': ('\x00...', 'unnamed'),
+     'longest failed': ('\x00...', 'unnamed'),
+     'longest successful': ('\x00...', 'unnamed'),
+     'shortest active': ('\x00\...', 'unnamed'),
+     'shortest failed': ('\x00\...', 'unnamed'),
+     'shortest successful': ('\x00...', 'unnamed'),
+     'started': 12,
+     'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
+     'statistics start': datetime.datetime(2006, 8, 10, 15, 56, 47, 211),
+     'successful': 9,
+     'unknown': 0}
+
+    We can also see the active job with ``getActiveJobIds``
+    
+    >>> job_ids = dispatcher.getActiveJobIds()
+    >>> len(job_ids)
+    1
+    >>> info = dispatcher.getJobInfo(*job_ids[0])
+    >>> pprint.pprint(info) # doctest: +ELLIPSIS
+    {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``zc.async.doctest_test.annotateStatus()``>",
+     'completed': None,
+     'failed': False,
+     'poll id': ...,
+     'quota names': (),
+     'result': None,
+     'started': datetime.datetime(...),
+     'thread': ...}
+     >>> info['thread'] is not None
+     True
+     >>> info['poll id'] is not None
+     True
+
+
+.. [#stats_2] Now the task is done, as the stats reflect.
+
+    >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
+    {'failed': 2,
+     'longest active': None,
+     'longest failed': ('\x00...', 'unnamed'),
+     'longest successful': ('\x00...', 'unnamed'),
+     'shortest active': None,
+     'shortest failed': ('\x00\...', 'unnamed'),
+     'shortest successful': ('\x00...', 'unnamed'),
+     'started': 12,
+     'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
+     'statistics start': datetime.datetime(...),
+     'successful': 10,
+     'unknown': 0}
+
+    The ``getActiveJobIds`` list shows the new task--which is completed, but
+    not as of the last poll, so it's still in the list.
+    
+    >>> job_ids = dispatcher.getActiveJobIds()
+    >>> len(job_ids)
+    1
+    >>> info = dispatcher.getJobInfo(*job_ids[0])
+    >>> pprint.pprint(info) # doctest: +ELLIPSIS
+    {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``zc.async.doctest_test.annotateStatus()``>",
+     'completed': datetime.datetime(...),
+     'failed': False,
+     'poll id': ...,
+     'quota names': (),
+     'result': '42',
+     'started': datetime.datetime(...),
+     'thread': ...}
+     >>> info['thread'] is not None
+     True
+     >>> info['poll id'] is not None
+     True
+
 .. [#stop_reactor] 
 
+    >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
+    {'failed': 2,
+     'longest active': None,
+     'longest failed': ('\x00...', 'unnamed'),
+     'longest successful': ('\x00...', 'unnamed'),
+     'shortest active': None,
+     'shortest failed': ('\x00\...', 'unnamed'),
+     'shortest successful': ('\x00...', 'unnamed'),
+     'started': 24,
+     'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
+     'statistics start': datetime.datetime(2006, 8, 10, 15, 57, 47, 211),
+     'successful': 22,
+     'unknown': 0}
     >>> reactor.stop()

Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py	2008-04-02 03:11:14 UTC (rev 85059)
@@ -1,5 +1,6 @@
 import time
 import datetime
+import bisect
 import Queue
 import thread
 import threading
@@ -7,9 +8,11 @@
 import twisted.python.failure
 import twisted.internet.defer
 import ZODB.POSException
+import BTrees
 import transaction
 import transaction.interfaces
 import zope.component
+import zope.bforest.periodic
 import zc.twist
 
 import zc.async.utils
@@ -116,11 +119,15 @@
             job = self.queue.get()
             while job is not None:
                 db, identifier, info = job
+                info['thread'] = thread.get_ident()
+                info['started'] = datetime.datetime.utcnow()
+                zc.async.utils.tracelog.info(
+                    'starting in thread %d: %r',
+                    info['thread'], info['call'])
                 conn = db.open()
                 try:
                     transaction.begin()
                     job = conn.get(identifier)
-                    info['thread'] = thread.get_ident()
                     local.job = job
                     try:
                         job() # this does the committing and retrying, largely
@@ -134,10 +141,20 @@
                                 transaction.abort() # retry forever (!)
                             else:
                                 break
+                    # should come before 'completed' for threading dance
+                    if isinstance(job.result, twisted.python.failure.Failure):
+                        info['failed'] = True
+                    info['completed'] = datetime.datetime.utcnow()
+                    info['result'] = repr(job.result)
                 finally:
                     local.job = None
                     transaction.abort()
                     conn.close()
+                zc.async.utils.tracelog.info(
+                    '%s %s in thread %d with result %s',
+                    info['call'],
+                    info['failed'] and 'failed' or 'succeeded',
+                    info['thread'], info['result'])
                 job = self.queue.get()
         finally:
             if self.dispatcher.activated:
@@ -172,21 +189,42 @@
                 self.queue.put(None)
         return size - old # size difference
 
+# this is mostly for testing
 
+_dispatchers = {}
+
+def get(uuid, default=None):
+    if uuid is None:
+        uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
+    return _dispatchers.get(uuid, default)
+
+def pop(uuid):
+    if uuid is None:
+        uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
+    return _dispatchers.pop(uuid)
+
+clear = _dispatchers.clear
+
 class Dispatcher(object):
 
     activated = False
+    conn = None
 
     def __init__(self, db, reactor, poll_interval=5, uuid=None):
+        if uuid is None:
+            uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
+        if uuid in _dispatchers:
+            raise ValueError('dispatcher for this UUID is already registered')
+        _dispatchers[uuid] = self
         self.db = db
         self.reactor = reactor # we may allow the ``reactor`` argument to be
         # None at some point, to default to the installed Twisted reactor.
         self.poll_interval = poll_interval
-        if uuid is None:
-            uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
         self.UUID = uuid
         self.polls = zc.async.utils.Periodic(
-            period=datetime.timedelta(days=1), buckets=4)
+            period=datetime.timedelta(hours=2), buckets=3)
+        self.jobs = zope.bforest.periodic.OOBForest(
+            period=datetime.timedelta(hours=3), count=4)
         self._activated = set()
         self.queues = {}
         self.dead_pools = []
@@ -214,6 +252,7 @@
             agent.queue.name,
             agent.queue._p_oid))
         if res is None:
+            # Successful commit
             res = job
         return res
 
@@ -246,10 +285,11 @@
                 break
 
     def poll(self):
-        conn = self.db.open()
         poll_info = PollInfo()
+        started_jobs = []
+        transaction.begin() # sync and clear
         try:
-            queues = conn.root().get(zc.async.interfaces.KEY)
+            queues = self.conn.root().get(zc.async.interfaces.KEY)
             if queues is None:
                 transaction.abort()
                 return
@@ -284,14 +324,17 @@
                     pools = self.queues[queue.name] = {}
                 for name, agent in da.items():
                     job_info = []
+                    active_jobs = [
+                        (job._p_oid,
+                         getattr(job._p_jar.db(), 'database_name', None))
+                         for job in agent]
                     agent_info = queue_info[name] = {
                         'size': None, 'len': None, 'error': None,
-                        'new_jobs': job_info}
+                        'new jobs': job_info, 'active jobs': active_jobs}
                     try:
                         agent_info['size'] = agent.size
                         agent_info['len'] = len(agent)
                     except zc.twist.EXPLOSIVE_ERRORS:
-                        transaction.abort()
                         raise
                     except:
                         agent_info['error'] = zc.twist.sanitize(
@@ -316,7 +359,6 @@
                             try:
                                 agent.failure = res
                             except zc.twist.EXPLOSIVE_ERRORS:
-                                transaction.abort()
                                 raise
                             except:
                                 transaction.abort()
@@ -326,13 +368,20 @@
                                 # TODO improve msg
                                 self._commit('trying to stash failure on agent')
                         else:
-                            info = {'oid': job._p_oid,
-                                    'callable': repr(job.callable),
-                                    'begin_after': job.begin_after.isoformat(),
-                                    'quota_names': job.quota_names,
-                                    'assignerUUID': job.assignerUUID,
+                            info = {'result': None,
+                                    'failed': False,
+                                    'poll id': None,
+                                    'quota names': job.quota_names,
+                                    'call': repr(job),
+                                    'started': None,
+                                    'completed': None,
                                     'thread': None}
-                            job_info.append(info)
+                            started_jobs.append(info)
+                            dbname = getattr(
+                                job._p_jar.db(), 'database_name', None)
+                            jobid = (job._p_oid, dbname)
+                            self.jobs[jobid] = info
+                            job_info.append(jobid)
                             pool.queue.put(
                                 (job._p_jar.db(), job._p_oid, info))
                             job = self._getJob(agent)
@@ -366,8 +415,11 @@
                     self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
         finally:
             transaction.abort()
-            conn.close()
             self.polls.add(poll_info)
+            for info in started_jobs:
+                info['poll id'] = poll_info.key
+            zc.async.utils.tracelog.info(
+                'poll %s: %r', poll_info.key, poll_info)
 
     def directPoll(self):
         if not self.activated:
@@ -395,8 +447,14 @@
     def activate(self, threaded=False):
         if self.activated:
             raise ValueError('already activated')
-        self.activated = True
+        self.activated = datetime.datetime.utcnow()
+        # in case this is a restart, we clear old data
+        self.polls.clear()
+        self.jobs.clear()
+        # increase pool size to account for the dispatcher poll
         self.db.setPoolSize(self.db.getPoolSize() + 1)
+        self.conn = self.db.open() # we keep the same connection for all
+        # polls as an optimization
         if threaded:
             self.reactor.callWhenRunning(self.threadedPoll)
         else:
@@ -408,9 +466,9 @@
         if not self.activated:
             raise ValueError('not activated')
         self.activated = False
-        conn = self.db.open()
+        transaction.begin()
         try:
-            queues = conn.root().get(zc.async.interfaces.KEY)
+            queues = self.conn.root().get(zc.async.interfaces.KEY)
             if queues is not None:
                 for queue in queues.values():
                     da = queue.dispatchers.get(self.UUID)
@@ -419,7 +477,7 @@
                 self._commit('trying to tear down')
         finally:
             transaction.abort()
-            conn.close()
+            self.conn.close()
         conn_delta = 0
         for queue_pools in self.queues.values():
             for name, pool in queue_pools.items():
@@ -427,3 +485,202 @@
                 self.dead_pools.append(queue_pools.pop(name))
         conn_delta -= 1
         self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
+
+    # these methods are used for monitoring and analysis
+
+    STOPPED = 'STOPPED'
+    RUNNING = 'RUNNING'
+    STUCK = 'STUCK'
+    STARTING = 'STARTING'
+
+    def getStatusInfo(self):
+        res = {'time since last poll': None, 'uptime': None, 'uuid': self.UUID}
+        poll_interval = res['poll interval'] = datetime.timedelta(
+                    seconds=self.poll_interval)
+        if not self.activated:
+            res['status'] = self.STOPPED
+        else:
+            now = datetime.datetime.utcnow()
+            try:
+                poll = self.polls.first()
+            except ValueError:
+                # no polls
+                next = self.activated + poll_interval
+                if next < now:
+                    res['status'] = self.STUCK
+                else:
+                    res['status'] = self.STARTING
+                res['time since last poll'] = now - self.activated
+            else:
+                next = poll.utc_timestamp + poll_interval
+                if next < now:
+                    res['status'] = self.STUCK
+                else:
+                    res['status'] = self.RUNNING
+                res['time since last poll'] = now - poll.utc_timestamp
+                res['uptime'] = now - self.activated
+        return res
+
+    def getJobInfo(self, oid, database_name=None):
+        if database_name is None:
+            # these will raise ValueErrors for unknown oids.  We'll let 'em.
+            minKey = self.jobs.minKey((oid,))
+            maxKey = self.jobs.maxKey((oid,))
+            if minKey != maxKey:
+                raise ValueError('ambiguous database name')
+            else:
+                database_name = minKey[1]
+        return self.jobs[(oid, database_name)]
+
+    def getActiveJobIds(self, queue=None, agent=None):
+        """returns active jobs from newest to oldest"""
+        res = []
+        try:
+            poll = self.polls.first()
+        except ValueError:
+            pass
+        else:
+            old = []
+            unknown = []
+            for info in _iter_info(poll, queue, agent):
+                res.extend(info['new jobs'])
+                for job_id in info['active jobs']:
+                    job_info = self.jobs.get(job_id)
+                    if job_info is None:
+                        unknown.append(job_id)
+                    else:
+                        bisect.insort(old, (job_info['poll id'], job_id))
+            res.extend(i[1] for i in old)
+            res.extend(unknown)
+        return res
+
+    def getPollInfo(self, at=None, before=None):
+        if at is not None:
+            if before is not None:
+                raise ValueError('may only provide one of `at` and `before`')
+            if isinstance(at, datetime.datetime):
+                at = zc.async.utils.dt_to_long(at)
+        elif before is not None:
+            if isinstance(before, datetime.datetime):
+                at = zc.async.utils.dt_to_long(before) + 16
+            else:
+                at = before + 1
+        for bucket in tuple(self.polls._data.buckets): # freeze order
+            try:
+                if at is None:
+                    key = bucket.minKey()
+                else:
+                    key = bucket.minKey(at)
+                return bucket[key]
+            except (ValueError, KeyError):
+                # ValueError because minKey might not have a value
+                # KeyError because bucket might be cleared in another thread
+                # between minKey and __getitem__
+                pass
+        raise ValueError('no poll matches')
+
+    def iterPolls(self, at=None, before=None, since=None, count=None):
+        # `polls` may be mutated during iteration so we don't iterate over it
+        if at is not None and before is not None:
+            raise ValueError('may only provide one of `at` and `before`')
+        if isinstance(since, datetime.datetime):
+            since = zc.async.utils.dt_to_long(since) + 15
+        ct = 0
+        while 1:
+            if count is not None and ct >= count:
+                break
+            try:
+                info = self.getPollInfo(at=at, before=before)
+            except ValueError:
+                break
+            else:
+                if since is None or before <= since:
+                    yield info
+                    ct += 1
+                    before = info.key
+                    at = None
+                else:
+                    break
+
+    def getStatistics(self, at=None, before=None, since=None, queue=None,
+                      agent=None):
+        if at is not None and before is not None:
+            raise ValueError('may only provide one of `at` and `before`')
+        res = {
+            'started': 0,
+            'successful': 0,
+            'failed': 0,
+            'unknown': 0
+            }
+        started = successful = failed = unknown = 0
+        _pair = (None, None)
+        successful_extremes = [_pair, _pair]
+        failed_extremes = [_pair, _pair]
+        active_extremes = [_pair, _pair]
+        now = datetime.datetime.utcnow()
+        first = True
+        poll = first_poll = None
+        def process(jobs):
+            for jobid in jobs:
+                jobinfo = self.jobs.get(jobid)
+                if jobinfo is None:
+                    res['unknown'] += 1
+                    continue
+                if jobinfo['completed']:
+                    if jobinfo['failed']:
+                        pair = failed_extremes
+                        res['failed'] += 1
+                    else:
+                        pair = successful_extremes
+                        res['successful'] += 1
+                else:
+                    pair = active_extremes
+                start = jobinfo['started'] or poll_time
+                stop = jobinfo['completed'] or now
+                duration = stop - start
+                if pair[0][0] is None or pair[0][0] > duration:
+                    pair[0] = (duration, jobid)
+                if pair[1][0] is None or pair[1][0] < duration:
+                    pair[1] = (duration, jobid)
+        for poll in self.iterPolls(at=at, before=before, since=since):
+            poll_time = poll.utc_timestamp
+            for agent_info in _iter_info(poll, queue, agent):
+                res['started'] += len(agent_info['new jobs'])
+                process(agent_info['new jobs'])
+            if first:
+                first = False
+                first_poll = poll
+        if poll is not None:
+            for agent_info in _iter_info(poll, queue, agent):
+                process(agent_info['active jobs'])
+        if first_poll is not None:
+            stat_start = first_poll.utc_timestamp
+            stat_end = poll.utc_timestamp
+        else:
+            start_start = None
+            stat_end = None
+        res.update({
+            'shortest successful': successful_extremes[0][1],
+            'longest successful': successful_extremes[1][1],
+            'shortest failed': failed_extremes[0][1],
+            'longest failed': failed_extremes[1][1],
+            'shortest active': active_extremes[0][1],
+            'longest active': active_extremes[1][1],
+            'statistics start': stat_start,
+            'statistics end': stat_end,
+            })
+        return res
+
+def _iter_info(poll, queue, agent):
+    if queue is None:
+        queues = poll.values()
+    elif queue not in poll:
+        queues = []
+    else:
+        queues = [poll[queue]]
+    for q in queues:
+        if agent is None:
+            for i in q.values():
+                yield i
+        elif agent in q:
+            yield q[agent]

Modified: zc.async/branches/dev/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.txt	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/dispatcher.txt	2008-04-02 03:11:14 UTC (rev 85059)
@@ -189,7 +189,11 @@
     >>> poll = get_poll()
     >>> import pprint
     >>> pprint.pprint(dict(poll))
-    {'': {'main': {'new_jobs': [], 'error': None, 'len': 0, 'size': 3}}}
+    {'': {'main': {'active jobs': [],
+                   'error': None,
+                   'len': 0,
+                   'new jobs': [],
+                   'size': 3}}}
 
 The pool size for the db has increased again to account for the size of the
 agent.
@@ -215,22 +219,39 @@
 
     >>> for poll in dispatcher.polls:
     ...     if (poll.get('') and poll[''].get('main') and
-    ...         poll['']['main']['new_jobs']):
+    ...         poll['']['main']['new jobs']):
     ...         break
     ... else:
     ...     assert False, 'poll not found'
     ...
     >>> pprint.pprint(dict(poll)) # doctest: +ELLIPSIS
-    {'': {'main': {'error': None,
+    {'': {'main': {'active jobs': [],
+                   'error': None,
                    'len': 0,
-                   'new_jobs': [{'assignerUUID': UUID(...),
-                                 'begin_after': '...',
-                                 'callable': '<built-in function mul>',
-                                 'oid': '...',
-                                 'quota_names': (),
-                                 'thread': ...}],
+                   'new jobs': [('\x00...', 'unnamed')],
                    'size': 3}}}
 
+[#getPollInfo]_ Notice our ``new jobs`` has a value in it now.  We can get
+some information about that job from the dispatcher.
+
+    >>> info = dispatcher.getJobInfo(*poll['']['main']['new jobs'][0])
+    >>> pprint.pprint(info)
+    ... # doctest: +ELLIPSIS
+    {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``<built-in function mul>(14, 3)``>",
+     'completed': datetime.datetime(...),
+     'failed': False,
+     'poll id': ...,
+     'quota names': (),
+     'result': '42',
+     'started': datetime.datetime(...),
+     'thread': ...}
+     >>> info['thread'] is not None
+     True
+     >>> info['poll id'] is not None
+     True
+
+Notice that the result is a repr.
+
 As seen in other documents in zc.async, the job can also be a method of a
 persistent object, affecting a persistent object.
 
@@ -287,6 +308,31 @@
     >>> wait_for_result(job4)
     "reply is 'HIYA'.  Locally it is MISSING."
 
+We can analyze the work the dispatcher has done.  the records for this
+generally only go back about a day.
+
+    >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
+    {'failed': 0,
+     'longest active': None,
+     'longest failed': None,
+     'longest successful': ('\x00...', 'unnamed'),
+     'shortest active': None,
+     'shortest failed': None,
+     'shortest successful': ('\x00...', 'unnamed'),
+     'started': 5,
+     'statistics end': datetime.datetime(...),
+     'statistics start': datetime.datetime(...),
+     'successful': 5,
+     'unknown': 0}
+
+We can get a report on the reactor's status.
+
+    >>> pprint.pprint(dispatcher.getStatusInfo()) # doctest: +ELLIPSIS
+    {'poll interval': datetime.timedelta(0, 0, 500000),
+     'status': 'RUNNING',
+     'time since last poll': datetime.timedelta(...),
+     'uptime': datetime.timedelta(...)}
+
 When we stop the reactor, the dispatcher also deactivates.
 
     >>> reactor.callFromThread(reactor.stop)
@@ -298,11 +344,22 @@
     ...     assert False, 'dispatcher did not deactivate'
     ...
 
+    >>> pprint.pprint(dispatcher.getStatusInfo()) # doctest: +ELLIPSIS
+    {'poll interval': datetime.timedelta(0, 0, 500000),
+     'status': 'STOPPED',
+     'time since last poll': None,
+     'uptime': None,
+     'uuid': UUID('...')}
+
 The db's pool size has returned to the original value.
 
     >>> db.getPoolSize()
     7
 
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
+
 .. [#setUp]
 
     >>> import ZODB.FileStorage
@@ -377,6 +434,24 @@
     ...         assert False, 'job never completed'
     ...
 
+.. [#getPollInfo] The dispatcher has a ``getPollInfo`` method that lets you
+    find this poll information also.
+    
+    >>> dispatcher.getPollInfo(at=poll.key) is poll
+    True
+    >>> dispatcher.getPollInfo(at=poll.utc_timestamp) is poll
+    True
+    >>> dispatcher.getPollInfo(before=poll.key) is not poll
+    True
+    >>> dispatcher.getPollInfo(before=poll.utc_timestamp) is not poll
+    True
+    >>> dispatcher.getPollInfo(before=poll.key-16) is poll
+    True
+    >>> dispatcher.getPollInfo(
+    ...     before=poll.utc_timestamp + datetime.timedelta(seconds=0.4)
+    ...     ) is poll
+    True
+
 .. [#wait_for_annotation]
 
     >>> def wait_for_annotation(job, name):

Modified: zc.async/branches/dev/src/zc/async/instanceuuid.py
===================================================================
--- zc.async/branches/dev/src/zc/async/instanceuuid.py	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/instanceuuid.py	2008-04-02 03:11:14 UTC (rev 85059)
@@ -14,9 +14,9 @@
 workers to connect to a single database to do work.  The software
 expects an instance home to only generate a single process.
 
-To get a new identifier for this software instance, delete this file and
-restart Zope (or more precisely, delete this file, restart Python, and
-import zc.async.instanceuuid).  This file will be recreated with a new value.
+To get a new identifier for this software instance, delete this file,
+restart Python, and import zc.async.instanceuuid.  This file will be
+recreated with a new value.
 """
 
 zope.interface.classImplements(uuid.UUID, zc.async.interfaces.IUUID)

Modified: zc.async/branches/dev/src/zc/async/job.py
===================================================================
--- zc.async/branches/dev/src/zc/async/job.py	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/job.py	2008-04-02 03:11:14 UTC (rev 85059)
@@ -18,6 +18,27 @@
 import zc.async.interfaces
 import zc.async.utils
 
+def _repr(obj):
+    if isinstance(obj, persistent.Persistent):
+        dbname = "?"
+        if obj._p_jar is not None:
+            dbname = getattr(obj._p_jar.db(), 'database_name', "?")
+            if dbname != '?':
+                dbname = repr(dbname)
+        if obj._p_oid is not None:
+            oid = ZODB.utils.u64(obj._p_oid)
+        else:
+            oid = '?'
+        return '%s.%s (oid %s, db %s)' % (
+            obj.__class__.__module__,
+            obj.__class__.__name__,
+            oid,
+            dbname)
+    elif isinstance(obj, types.FunctionType):
+        return '%s.%s' % (obj.__module__, obj.__name__)
+    else:
+        return repr(obj)
+
 def success_or_failure(success, failure, res):
     callable = None
     if isinstance(res, twisted.python.failure.Failure):
@@ -53,7 +74,7 @@
     _quota_names = ()
 
     def __init__(self, *args, **kwargs):
-        self.args = persistent.list.PersistentList(args)
+        self.args = persistent.list.PersistentList(args) # TODO: blist
         self.callable = self.args.pop(0)
         self.kwargs = persistent.mapping.PersistentMapping(kwargs)
         self.callbacks = zc.queue.PersistentQueue()
@@ -167,6 +188,23 @@
         res.args.insert(0, res)
         return res
 
+    def __repr__(self):
+        try:
+            call = _repr(self._callable_root)
+            if self._callable_name is not None:
+                call += ' :' + self._callable_name
+            args = ', '.join(_repr(a) for a in self.args)
+            kwargs = ', '.join(k+"="+_repr(v) for k, v in self.kwargs.items())
+            if args:
+                if kwargs:
+                    args += ", " + kwargs
+            else:
+                args = kwargs
+            return '<%s ``%s(%s)``>' % (_repr(self), call, args)
+        except (TypeError, ValueError, AttributeError):
+            # broken reprs are a bad idea; they obscure problems
+            return super(Job, self).__repr__()
+
     @property
     def callable(self):
         if self._callable_name is None:

Modified: zc.async/branches/dev/src/zc/async/job.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/job.txt	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/job.txt	2008-04-02 03:11:14 UTC (rev 85059)
@@ -39,8 +39,15 @@
     >>> import transaction
     >>> transaction.commit()
 
-Now we have a job [#verify]_.  Initially it has a NEW status.
+Now we have a job [#verify]_.  The __repr__ tries to be helpful, identifying
+the persistent object identifier ("oid") in hex and the database ("db"), and
+trying to render the call.
 
+    >>> j # doctest: +ELLIPSIS
+    <zc.async.job.Job (oid ... db 'unnamed') ``zc.async.doctest_test.call()``>
+
+Initially it has a NEW status.
+
     >>> import zc.async.interfaces
     >>> j.status == zc.async.interfaces.NEW
     True
@@ -75,7 +82,14 @@
     >>> demo.counter
     0
     >>> j = root['j'] = zc.async.job.Job(demo.increase)
+    >>> j # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    <zc.async.job.Job (oid ?, db ?)
+     ``zc.async.doctest_test.Demo (oid ?, db ?) :increase()``>
+
     >>> transaction.commit()
+    >>> j # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    <zc.async.job.Job (oid ..., db 'unnamed')
+     ``zc.async.doctest_test.Demo (oid ..., db 'unnamed') :increase()``>
     >>> j() # result is None
     >>> demo.counter
     1
@@ -197,6 +211,10 @@
     >>> j = root['j3'] = zc.async.job.Job(
     ...     argCall, root['demo2'], value=4)
     >>> transaction.commit()
+    >>> j # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    <zc.async.job.Job (oid ..., db 'unnamed')
+     ``zc.async.doctest_test.argCall(zc.async.doctest_test.PersistentDemo (oid ..., db 'unnamed'),
+                                     value=4)``>
     >>> j()
     >>> root['demo2'].value
     4
@@ -274,7 +292,7 @@
     ...     print "failure.", f
     ...
     >>> j.addCallbacks(success, failure) # doctest: +ELLIPSIS
-    <zc.async.job.Job object at ...>
+    <zc.async.job.Job ...>
     >>> res = j()
     success! 15
 
@@ -283,7 +301,7 @@
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, None)
     >>> transaction.commit()
     >>> j.addCallbacks(success, failure) # doctest: +ELLIPSIS
-    <zc.async.job.Job object at ...>
+    <zc.async.job.Job (oid ...>
     >>> res = j() # doctest: +ELLIPSIS
     failure. [Failure instance: Traceback: exceptions.TypeError...]
 
@@ -298,9 +316,9 @@
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, 3)
     >>> transaction.commit()
     >>> j.addCallbacks(success) # doctest: +ELLIPSIS
-    <zc.async.job.Job object at ...>
+    <zc.async.job.Job (oid ...>
     >>> j.addCallbacks(also_success) # doctest: +ELLIPSIS
-    <zc.async.job.Job object at ...>
+    <zc.async.job.Job (oid ...>
     >>> res = j()
     success! 15
     also a success! 15
@@ -308,9 +326,9 @@
     >>> j = root['j'] = zc.async.job.Job(multiply, 5, None)
     >>> transaction.commit()
     >>> j.addCallbacks(failure=failure) # doctest: +ELLIPSIS
-    <zc.async.job.Job object at ...>
+    <zc.async.job.Job (oid ...>
     >>> j.addCallbacks(failure=also_failure) # doctest: +ELLIPSIS
-    <zc.async.job.Job object at ...>
+    <zc.async.job.Job (oid ...>
     >>> res = j() # doctest: +ELLIPSIS
     failure. [Failure instance: Traceback: exceptions.TypeError...]
     also a failure. [Failure instance: Traceback: exceptions.TypeError...]
@@ -346,7 +364,7 @@
     >>> transaction.commit()
     >>> j.addCallbacks(zc.async.job.Job(multiply, 4)
     ...               ).addCallbacks(success) # doctest: +ELLIPSIS
-    <zc.async.job.Job object at ...>
+    <zc.async.job.Job (oid ...>
     >>> res = j()
     success! 60
 
@@ -359,7 +377,7 @@
     >>> transaction.commit()
     >>> j.addCallbacks(
     ...     failure=handle_failure).addCallbacks(success) # doctest: +ELLIPSIS
-    <zc.async.job.Job object at ...>
+    <zc.async.job.Job (oid ...>
     >>> res = j()
     success! 0
 
@@ -536,7 +554,7 @@
     >>> transaction.commit()
     >>> j.args.append(j)
     >>> res = j() # doctest: +ELLIPSIS
-    <zc.async.job.Job object at ...>
+    <zc.async.job.Job (oid ...>
 
 A class method on Job, ``bind``, can simplify this.  It puts the job as
 the first argument to the callable, as if the callable were bound as a method
@@ -545,7 +563,7 @@
     >>> j = root['j'] = zc.async.job.Job.bind(show)
     >>> transaction.commit()
     >>> res = j() # doctest: +ELLIPSIS
-    <zc.async.job.Job object at ...>
+    <zc.async.job.Job (oid ...>
 
 Result and Status
 -----------------

Added: zc.async/branches/dev/src/zc/async/monitor.py
===================================================================
--- zc.async/branches/dev/src/zc/async/monitor.py	                        (rev 0)
+++ zc.async/branches/dev/src/zc/async/monitor.py	2008-04-02 03:11:14 UTC (rev 85059)
@@ -0,0 +1,291 @@
+import re
+import datetime
+import pytz
+import uuid
+import simplejson
+
+import zope.component
+
+import zc.async.dispatcher
+
+_marker = object()
+class Encoder(simplejson.JSONEncoder):
+    def default(self, obj):
+        if isinstance(obj, datetime.timedelta):
+            tmp = {'days': obj.days,
+                   'hours': obj.seconds // (60*60),
+                   'minutes': (obj.seconds % (60*60)) // 60,
+                   'seconds': float(
+                        obj.seconds % 60) + obj.microseconds/1000000
+                  }
+            res = dict((k, v) for k, v in tmp.items() if v)
+            if not res:
+                res['seconds'] = 0
+            return res
+        # TODO the spelling of this conditional is to support our test setup
+        # shenanigans.  originally was ``isinstance(obj, datetime.datetime)``.
+        # Would be nice to fix, though the duck typing is Pythonic at least.
+        elif (getattr(obj, 'tzinfo', _marker) is not _marker and
+              getattr(obj, 'astimezone', _marker) is not _marker):
+            if obj.tzinfo is not None:
+                obj = obj.astimezone(pytz.UTC).replace(tzinfo=None)
+            return obj.isoformat() + "Z"
+        elif isinstance(obj, uuid.UUID):
+            return str(obj)
+        return simplejson.JSONEncoder.default(self, obj)
+
+encoder = Encoder(sort_keys=True, indent=4) 
+
+
+def status(uuid=None):
+    """Get general zc.async dispatcher information.
+    
+    'status' is one of 'STUCK', 'STARTING', 'RUNNING', or 'STOPPED'."""
+    if uuid is not None:
+        uuid = uuid.UUID(uuid)
+    return encoder.encode(zc.async.dispatcher.get(uuid).getStatusInfo())
+
+def jobs(queue=None, agent=None, uuid=None):
+    """Show active jobs as of last poll, sorted from newest to oldest.
+    
+    Usage:
+
+        jobs
+        (returns active jobs as of last poll, newest to oldest)
+
+        jobs queue:<queue name>
+        (jobs are filtered to those coming from the named queue)
+        
+        jobs agent:<agent name>
+        (jobs are filtered to those coming from agents with given name)
+
+    "queue:" and "agent:" modifiers may be combined.
+
+    Example:
+
+        async jobs queue: agent:main
+        (results filtered to queue named '' and agent named 'main')"""
+    if uuid is not None:
+        uuid = uuid.UUID(uuid)
+    return encoder.encode(
+        zc.async.dispatcher.get(uuid).getActiveJobIds(queue, agent))
+
+def job(OID, database=None, uuid=None):
+    """Local information about a job as of last poll, if known.
+
+    Does not consult ZODB, but in-memory information."""
+    if uuid is not None:
+        uuid = uuid.UUID(uuid)
+    return encoder.encode(
+        zc.async.dispatcher.get(uuid).getJobInfo(OID, database))
+
+_find = re.compile(r'\d+[DHMS]').findall
+def _dt(s):
+    if s is None:
+        res = s
+    else:
+        try:
+            res = int(s)
+        except ValueError:
+            vals = {}
+            for val in _find(s.upper()):
+                vals[val[-1]] = int(val[:-1])
+            res = datetime.timedelta(
+                days=vals.get('D', 0),
+                hours=vals.get('H', 0),
+                minutes=vals.get('M', 0),
+                seconds=vals.get('S', 0)) + datetime.datetime.utcnow()
+    return res
+                
+
+def jobstats(at=None, before=None, since=None, queue=None, agent=None,
+             uuid=None):
+    """Statistics on historical jobs as of last poll.
+    
+    Usage:
+
+        jobstats
+        (returns statistics on historical jobs as of last poll)
+
+        jobstats queue:<queue name>
+        (statistics are filtered to those coming from the named queue)
+
+        jobstats agent:<agent name>
+        (statistics are filtered to those coming from agents with given name)
+
+        jobstats at:<poll key or interval>
+        (statistics are collected at or before the poll key or interval)
+
+        jobstats before:<pollkey or interval>
+        (statistics are collected before the poll key or interval)
+
+        jobstats since:<pollkey or interval>
+        (statistics are collected since poll key or interval, inclusive)
+
+    The modifiers "queue:", "agent:", "since:", and one of "at:" or "before:"
+    may be combined.
+
+    Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+    be replaced with a positive integer, and "D," "H," "M," and "S" are
+    literals standing for "days," "hours," "minutes," and "seconds." 
+    For instance, you might use ``5M`` for five minutes, ``20S`` for
+    twenty seconds, or ``1H30M`` for an hour and a half.
+    
+    Poll keys are the values shown as "key" from the ``poll`` or ``polls``
+    command.
+
+    Example:
+
+        async jobstats queue: agent:main since:1H
+        (results filtered to queue named '' and agent named 'main' from now
+         till one hour ago)"""
+    # TODO: parse since and before to datetimes
+    if uuid is not None:
+        uuid = uuid.UUID(uuid)
+    return encoder.encode(
+        zc.async.dispatcher.get(uuid).getStatistics(
+            _dt(at), _dt(before), _dt(since), queue, agent))
+
+def poll(at=None, before=None, uuid=None):
+    """Get information about a single poll, defaulting to most recent.
+    
+    Usage:
+    
+        poll
+        (returns most recent poll)
+        
+        poll at:<poll key or interval>
+        (returns poll at or before the poll key or interval)
+        
+        poll before:<poll key or interval>
+        (returns poll before the poll key or interval)
+
+    Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+    be replaced with a positive integer, and "D," "H," "M," and "S" are
+    literals standing for "days," "hours," "minutes," and "seconds." 
+    For instance, you might use ``5M`` for five minutes, ``20S`` for
+    twenty seconds, or ``1H30M`` for an hour and a half.
+    
+    Example:
+    
+        async poll at:5M
+        (get the poll information at five minutes ago or before)"""
+    # TODO: parse at and before to datetimes
+    if uuid is not None:
+        uuid = uuid.UUID(uuid)
+    info = zc.async.dispatcher.get(uuid).getPollInfo(_dt(at), _dt(before))
+    res = {'key': info.key, 'time': info.utc_timestamp.isoformat() + "Z",
+           'results': info}
+    return encoder.encode(res)
+
+def polls(at=None, before=None, since=None, count=None, uuid=None):
+    """Get information about recent polls, defaulting to most recent.
+    
+    Usage:
+    
+        polls
+        (returns most recent 3 poll)
+        
+        polls at:<poll key or interval>
+        (returns up to 3 polls at or before the poll key or interval)
+        
+        polls before:<poll key or interval>
+        (returns up to 3 polls before the poll key or interval)
+        
+        polls since:<poll key or interval>
+        (returns polls since the poll key or interval, inclusive)
+        
+        polls count <positive integer>
+        (returns the given number of the most recent files)
+
+    The modifiers "since:", "count:", and one of "at:" or "before:" may
+    be combined.
+
+    Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+    be replaced with a positive integer, and "D," "H," "M," and "S" are
+    literals standing for "days," "hours," "minutes," and "seconds." 
+    For instance, you might use ``5M`` for five minutes, ``20S`` for
+    twenty seconds, or ``1H30M`` for an hour and a half.
+    
+    Example:
+    
+        async polls before:5M since:10M
+        (get the poll information from 5 to 10 minutes ago)"""
+    if uuid is not None:
+        uuid = uuid.UUID(uuid)
+    if count is None:
+        if since is None:
+            count = 3
+    else:
+        count = int(count)
+    return encoder.encode(
+        [{'key': p.key, 'time': p.utc_timestamp.isoformat() + "Z",
+          'results': p}
+         for p in zc.async.dispatcher.get(uuid).iterPolls(
+            _dt(at), _dt(before), _dt(since), count)])
+
+# provide in async and separately:
+
+def utcnow():
+    """Return the current time in UTC, in ISO 8601 format."""
+    return datetime.datetime.utcnow().isoformat() + "Z"
+
+def UUID():
+    """Get instance UUID in hex."""
+    res = zope.component.getUtility(zc.async.interfaces.IUUID)
+    if res is not None:
+        return str(res)
+
+funcs = {}
+
+def help(cmd=None):
+    """Get help on an async monitor tool.
+    
+    Usage is 'async help <tool name>' or 'async help'."""
+    if cmd is None:
+        res = [
+            "These are the tools available.  Usage for each tool is \n"
+            "'async <tool name> [modifiers...]'.  Learn more about each \n"
+            "tool using 'async help <tool name>'.\n"]
+        for nm, func in sorted(funcs.items()):
+            res.append('%s: %s' % (
+                nm, func.__doc__.split('\n', 1)[0]))
+        return '\n'.join(res)
+    f = funcs.get(cmd)
+    if f is None:
+        return 'Unknown async tool'
+    return f.__doc__
+
+for f in status, jobs, job, jobstats, poll, polls, utcnow, UUID, help:
+    funcs[f.__name__] = f
+
+def async(connection, cmd=None, *raw):
+    """A collection of tools to monitor zc.async activity in this process.
+    
+    To see a list of async tools, use 'async help'.
+    
+    To learn more about an async monitor tool, use 'async help <tool name>'."""
+    if cmd is None:
+        res = async.__doc__
+    else:
+        f = funcs.get(cmd)
+        if f is None:
+            res = '[Unknown async tool]'
+        else:
+            args = []
+            kwargs = {}
+            for val in raw:
+                if ':' in val:
+                    key, val = val.split(':', 1)
+                    kwargs[key] = val
+                else:
+                    if kwargs:
+                        raise ValueError(
+                            'placeful modifiers must come before named '
+                            'modifiers')
+                    args.append(val)
+            res = f(*args, **kwargs)
+    connection.write(res)
+    connection.write('\n')
+
+    

Modified: zc.async/branches/dev/src/zc/async/monitor.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/monitor.txt	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/monitor.txt	2008-04-02 03:11:14 UTC (rev 85059)
@@ -1,191 +1,377 @@
-THIS SHOULD BE ONLY RUN WITH A CUSTOM SET UP.  BASIC zc.async SHOULD NOT
-DEPEND ON Z3MONITOR
+Monitoring Dispatchers
+======================
 
-Monitoring Agents
-=================
+A process's zc.async dispatcher[#setUp]_ can be monitored in-process via
+zc.z3monitor plugins.  Let's imagine we have a connection over which we
+can send text messages to the monitor server [#z3monitor_setup]_.
 
-Agents can be monitored in-process via zc.z3monitor plugins.  Let's imagine
-we have a connection over which we can send text messages to the monitor
-server [#z3monitor_setup]_.
-
 All monitoring is done through the ``async`` command.  Here is its
 description, using the zc.z3monitor ``help`` command.
 
     >>> connection.test_input('help async\n')
+    Help for async:
+    <BLANKLINE>
     A collection of tools to monitor zc.async activity in this process.
-    
-    To learn more about an async monitor tool, use 'async help <tool name>'.
-    
-    These are the tools available.  Usage for each tool is
-    'async <tool name> [arguments...]'.
-    
-    help <tool name>: Get help on the named tool.
-    agents [poll_timeout]: Get information about agents in this process.
-    jobs: Get information about current jobs.
-    jobhistory [time]: Get historical statistics of jobs.
-    agent <agent name>: Get detailed information about a given agent.
-    agentjobs <agent name>: Get detailed information about the agent's jobs.
-    job <oid> [database name]: Get information about a job.
-    log <agent name> count: Get up to 100 of the last log entries for a given agent.
+    <BLANKLINE>
+        To see a list of async tools, use 'async help'.
+    <BLANKLINE>
+        To learn more about an async monitor tool, use 'async help <tool name>'.
     -> CLOSE
 
 As you can see, you use ``async help`` to get more information about each
 async-specific command.
 
     >>> connection.test_input('async help\n')
-    Get help on an async monitor tool.
-    
-    Usage is 'async help <tool name>'.  'async help' (or 'async help help')
-    returns this message.
-    
-    These are the tools available.
-    
-    help <tool name>: Get help on the named tool.
-    agents [poll_timeout]: Get information about agents in this process.
-    jobs: Get information about jobs
-    jobhistory [time]: Get information about job history.
-    agent <agent name>: Get information about a given agent.
-    agentjobs <agent name>: Get information about the agent's jobs.
-    agentjobhistory <agent name> [time]: Statistics on agent's job history.
-    job <oid> [database name]: Get information about a job.
+    These are the tools available.  Usage for each tool is 
+    'async <tool name> [modifiers...]'.  Learn more about each 
+    tool using 'async help <tool name>'.
+    <BLANKLINE>
+    UUID: Get instance UUID in hex.
+    help: Get help on an async monitor tool.
+    job: Local information about a job as of last poll, if known.
+    jobs: Show active jobs as of last poll, sorted from newest to oldest.
+    jobstats: Statistics on historical jobs as of last poll.
+    poll: Get information about a single poll, defaulting to most recent.
+    polls: Get information about recent polls, defaulting to most recent.
+    status: Get general zc.async dispatcher information.
+    utcnow: Return the current time in UTC, in ISO 8601 format. 
     -> CLOSE
 
-Let's get the help for each async command now.  The ``agents`` command is
-the broadest summary.
+Let's give a quick run through these for an overview, and then we'll dig in
+just a bit.
 
-    >>> connection.test_input('async help agents\n')
-    Get information about agents in this process.
-    
-    Usage: async agents [poll_timeout]
-    
-    If ``poll_timeout`` is not provided, it defaults to 10 (seconds).
-    
-    Returns this information, an element per line:
-    
-    - the UID of this process;
-    - the total number of registered agents;
-    - the names of registered agents;
-    - the number of running agents;
-    - the names of running agents;
-    - the number of agents that have not polled within the past poll_timeout
-      seconds;
-    - the names of agents that have not polled within the last poll_timeout
-      seconds;
+The ``UUID`` command returns the instance's UUID.
+
+    >>> connection.test_input('async help UUID\n')
+    Get instance UUID in hex. 
     -> CLOSE
 
-The ``jobs`` command gives a summary of the current active jobs for all
-agents.
+    >>> connection.test_input('async UUID\n')
+    d10f43dc-ffdf-11dc-abd4-0017f2c49bdd 
+    -> CLOSE
 
-    >>> connection.test_input('async help jobs')
-    Get information about jobs
-    
-    Usage: async jobs
-    
-    Returns this information, an element per line:
-    
-    - the total number of active jobs with a connection in all agents;
-    - the total number of active jobs in all agents;
-    - the time in seconds of the oldest active job, or 0;
-    - the name of the agent with the oldest active job, or an empty line;
-    - the oid, database_name of the oldest active job, or an empty line;
-    - the time in seconds of the oldest active job without a connection, or 0;
-    - the name of the agent with the oldest active job without a connection,
-      or an empty line;
-    - the oid, database_name of the oldest active job without a connection,
-      or an empty line;
+The ``utcnow`` command returns the current time in UTC.  This can be
+convenient to decipher the meaning of UTC datetimes returned from other
+commands.
+
+    >>> connection.test_input('async help utcnow\n')
+    Return the current time in UTC, in ISO 8601 format. 
     -> CLOSE
 
-The ``jobhistory`` command gives information on the past jobs run by all
-agents.
+    >>> connection.test_input('async utcnow\n')
+    2006-08-10T15:44:23.000211Z 
+    -> CLOSE
 
-    >>> connection.test_input('async help jobhistory')
-    Get historical statistics of jobs.
-    
-    Usage: async jobhistory [time]
-    
-    If ``time`` is not provided, returns information since process started.
-    
-    Otherwise, ``time`` should be any of the following:
-    [1-...]d
-    [1-23]h
-    [1-59]m
-    where "d" indicates day or days, "h" hour or hours, and "m" minute or
-    minutes.
-    
-    Returns this information, an element per line:
+The ``status`` command is the first of the "serious" monitoring
+commands.  As such, it starts some patterns that the rest of the
+commands will follow.
 
-    - the total number of jobs started by all registered agents in this
-      process, in time period if given;    
-    - the total number of jobs completed by all registered agents in this
-      process, in time period if given;
-    - the total number of jobs completed by all registered agents in this
-      process that ended in a Failure, in time period if given;
-    - the name of the agent with the most completed jobs in this process, in
-      time period if given;
-    - the name of the agent with the highest total number of failures in this
-      process, in time period if given;
-    - the name of the agent with the most started jobs in this process, in
-      time period if given.
+- output is pretty-printed JSON
+
+- durations are in a dict of keys 'days', 'hours', 'minutes', and 'seconds',
+  with all as ints except seconds, which is a float.
+
+    >>> connection.test_input('async help status\n')
+    Get general zc.async dispatcher information.
+    <BLANKLINE>
+        'status' is one of 'STUCK', 'STARTING', 'RUNNING', or 'STOPPED'. 
     -> CLOSE
 
-The ``agent`` tool gives information specific to an individual agent.
+    >>> connection.test_input('async status\n')
+    {
+        "poll interval": {
+            "seconds": 5.0
+        }, 
+        "status": "RUNNING", 
+        "time since last poll": {
+            "seconds": 1.0
+        }, 
+        "uptime": {
+            "seconds": 1.0
+        }, 
+        "uuid": "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd"
+    } 
+    -> CLOSE
 
-    >>> connection.test_input('async help agent\n')
-    Get information about a given agent.
-    
-    Usage: async agent <agent name> [time]
-    
-    If ``time`` is not provided, returns information since process started.
-    
-    Otherwise, ``time`` should be any of the following:
-    [1-...]d
-    [1-23]h
-    [1-59]m
-    where "d" indicates day or days, "h" hour or hours, and "m" minute or
-    minutes.
-    
-    Returns this information, an element per line:
-    
-    - number of seconds since last poll;
-    - total number of worker threads/connections for this agent;
-    - number of active jobs with a connection for this agent;
-    - number of active jobs for this agent;
-    - the time in seconds of the oldest active job, or 0;
-    - the time in seconds of the oldest active job without a connection, or 0;
-    - the oid, database_name of the oldest active job, or an empty line;
-    - the oid, database_name of the oldest active job without a connection,
-      or an empty line;
-    - the total number of jobs started by this agent since process start or
-      in time period if given;    
-    - the total number of jobs completed by this agent since process start or
-      in time period if given;
-    - the total number of jobs completed by this agent since process start or
-      in time period if given that ended in a Failure.
+Here's the ``jobs`` command.  It introduces some new patterns.
+
+- some command modifiers are available as <modifier>:<value>
+
+- several commands have the "queue:" and "agent:" modifiers.
+
+    >>> connection.test_input('async help jobs\n')
+    Show active jobs as of last poll, sorted from newest to oldest.
+    <BLANKLINE>
+        Usage:
+    <BLANKLINE>
+            jobs
+            (returns active jobs as of last poll, newest to oldest)
+    <BLANKLINE>
+            jobs queue:<queue name>
+            (jobs are filtered to those coming from the named queue)
+    <BLANKLINE>
+            jobs agent:<agent name>
+            (jobs are filtered to those coming from agents with given name)
+    <BLANKLINE>
+        "queue:" and "agent:" modifiers may be combined.
+    <BLANKLINE>
+        Example:
+    <BLANKLINE>
+            async jobs queue: agent:main
+            (results filtered to queue named '' and agent named 'main') 
     -> CLOSE
 
-The ``log`` tool gives up to 100 of the last log entries of a given agent.
+    >>> connection.test_input('async jobs\n')
+    [] 
+    -> CLOSE
 
-    >>> connection.test_input('async help log')
-    Get up to 100 of the last log entries for a given agent.
-    
-    Usage: async log <agent name> [count]
-    -> CLOSE  
+The ``jobstats`` analyzes past polls and job information to come up with
+some potentially useful statistics.  It includes the optional "queue:" and
+"agent:" modifiers.  It also shows some new patterns.
 
-.. [#z3monitor_setup]
+- datetimes are in UTC, in ISO 8601 format.
 
+- The "at:", "before:" and "since:" modifiers are intervals, or poll keys.
+
+- "at:" and "before:" may not be combined.
+
+    >>> connection.test_input('async help jobstats\n')
+    Statistics on historical jobs as of last poll.
+    <BLANKLINE>
+        Usage:
+    <BLANKLINE>
+            jobstats
+            (returns statistics on historical jobs as of last poll)
+    <BLANKLINE>
+            jobstats queue:<queue name>
+            (statistics are filtered to those coming from the named queue)
+    <BLANKLINE>
+            jobstats agent:<agent name>
+            (statistics are filtered to those coming from agents with given name)
+    <BLANKLINE>
+            jobstats at:<poll key or interval>
+            (statistics are collected at or before the poll key or interval)
+    <BLANKLINE>
+            jobstats before:<pollkey or interval>
+            (statistics are collected before the poll key or interval)
+    <BLANKLINE>
+            jobstats since:<pollkey or interval>
+            (statistics are collected since poll key or interval, inclusive)
+    <BLANKLINE>
+        The modifiers "queue:", "agent:", "since:", and one of "at:" or "before:"
+        may be combined.
+    <BLANKLINE>
+        Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+        be replaced with a positive integer, and "D," "H," "M," and "S" are
+        literals standing for "days," "hours," "minutes," and "seconds." 
+        For instance, you might use ``5M`` for five minutes, ``20S`` for
+        twenty seconds, or ``1H30M`` for an hour and a half.
+    <BLANKLINE>
+        Poll keys are the values shown as "key" from the ``poll`` or ``polls``
+        command.
+    <BLANKLINE>
+        Example:
+    <BLANKLINE>
+            async jobstats queue: agent:main since:1H
+            (results filtered to queue named '' and agent named 'main' from now
+             till one hour ago) 
+    -> CLOSE
+
+    >>> connection.test_input('async jobstats\n')
+    {
+        "failed": 0, 
+        "longest active": null, 
+        "longest failed": null, 
+        "longest successful": null, 
+        "shortest active": null, 
+        "shortest failed": null, 
+        "shortest successful": null, 
+        "started": 0, 
+        "statistics end": "2006-08-10T15:44:22.000211Z", 
+        "statistics start": "2006-08-10T15:44:22.000211Z", 
+        "successful": 0, 
+        "unknown": 0
+    } 
+    -> CLOSE
+
+The ``poll`` command uses patterns we've seen above.
+
+    >>> connection.test_input('async help poll\n')
+    Get information about a single poll, defaulting to most recent.
+    <BLANKLINE>
+        Usage:
+    <BLANKLINE>    
+            poll
+            (returns most recent poll)
+    <BLANKLINE>
+            poll at:<poll key or interval>
+            (returns poll at or before the poll key or interval)
+    <BLANKLINE>
+            poll before:<poll key or interval>
+            (returns poll before the poll key or interval)
+    <BLANKLINE>
+        Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+        be replaced with a positive integer, and "D," "H," "M," and "S" are
+        literals standing for "days," "hours," "minutes," and "seconds." 
+        For instance, you might use ``5M`` for five minutes, ``20S`` for
+        twenty seconds, or ``1H30M`` for an hour and a half.
+    <BLANKLINE>
+        Example:
+    <BLANKLINE>
+            async poll at:5M
+            (get the poll information at five minutes ago or before) 
+    -> CLOSE
+
+    >>> connection.test_input('async poll\n')
+    {
+        "key": 6420106068108777167, 
+        "results": {
+            "": {}
+        }, 
+        "time": "2006-08-10T15:44:22.000211Z"
+    } 
+    -> CLOSE
+
+``polls`` does too.
+
+    >>> connection.test_input('async help polls\n')
+    Get information about recent polls, defaulting to most recent.
+    <BLANKLINE>
+        Usage:
+    <BLANKLINE>
+            polls
+            (returns most recent 3 poll)
+    <BLANKLINE>
+            polls at:<poll key or interval>
+            (returns up to 3 polls at or before the poll key or interval)
+    <BLANKLINE>
+            polls before:<poll key or interval>
+            (returns up to 3 polls before the poll key or interval)
+    <BLANKLINE>
+            polls since:<poll key or interval>
+            (returns polls since the poll key or interval, inclusive)
+    <BLANKLINE>
+            polls count <positive integer>
+            (returns the given number of the most recent files)
+    <BLANKLINE>
+        The modifiers "since:", "count:", and one of "at:" or "before:" may
+        be combined.
+    <BLANKLINE>
+        Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+        be replaced with a positive integer, and "D," "H," "M," and "S" are
+        literals standing for "days," "hours," "minutes," and "seconds." 
+        For instance, you might use ``5M`` for five minutes, ``20S`` for
+        twenty seconds, or ``1H30M`` for an hour and a half.
+    <BLANKLINE>
+        Example:
+    <BLANKLINE>
+            async polls before:5M since:10M
+            (get the poll information from 5 to 10 minutes ago) 
+    -> CLOSE
+
+    >>> connection.test_input('async polls\n')
+    [
+        {
+            "key": 6420106068108777167, 
+            "results": {
+                "": {}
+            }, 
+            "time": "2006-08-10T15:44:22.000211Z"
+        }
+    ] 
+    -> CLOSE
+
+.. [#setUp] See the discussion in other documentation to explain this code.
+
+    >>> from zc.twist import transactionManager, connection
+    >>> import zope.component
+    >>> zope.component.provideAdapter(transactionManager)
+    >>> zope.component.provideAdapter(connection)
+    >>> import ZODB.interfaces
+    >>> zope.component.provideAdapter(
+    ...     transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+    >>> import zope.component
+    >>> import types
+    >>> import zc.async.interfaces
+    >>> import zc.async.job
+    >>> zope.component.provideAdapter(
+    ...     zc.async.job.Job,
+    ...     adapts=(types.FunctionType,),
+    ...     provides=zc.async.interfaces.IJob)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.job.Job,
+    ...     adapts=(types.MethodType,),
+    ...     provides=zc.async.interfaces.IJob)
+    ...
+
+    >>> import zc.async.testing
+    >>> reactor = zc.async.testing.Reactor()
+    >>> reactor.start() # this mokeypatches datetime.datetime.now 
+
+    >>> import ZODB.FileStorage
+    >>> storage = ZODB.FileStorage.FileStorage(
+    ...     'zc_async.fs', create=True)
+    >>> from ZODB.DB import DB 
+    >>> db = DB(storage) 
+    >>> conn = db.open()
+    >>> root = conn.root()
+
+    >>> import zc.async.queue
+    >>> import zc.async.interfaces
+    >>> mapping = root[zc.async.interfaces.KEY] = zc.async.queue.Queues()
+    >>> queue = mapping[''] = zc.async.queue.Queue()
+    >>> import transaction
+    >>> transaction.commit()
+
+    >>> from zc.async.instanceuuid import UUID
+    >>> import zope.component
+    >>> zope.component.provideUtility(
+    ...     UUID, zc.async.interfaces.IUUID, '')
+
+    >>> import zc.async.dispatcher
+    >>> dispatcher = zc.async.dispatcher.Dispatcher(db, reactor)
+    >>> dispatcher.activate()
+    >>> reactor.time_flies(1)
+    1
+
+    >>> import zc.async.agent
+    >>> agent = zc.async.agent.Agent()
+    >>> queue.dispatchers[dispatcher.UUID]['main'] = agent
+    >>> transaction.commit()
+
+    >>> import time
+    >>> def wait_for(*jobs, **kwargs):
+    ...     reactor.time_flies(dispatcher.poll_interval) # starts thread
+    ...     # now we wait for the thread
+    ...     for i in range(kwargs.get('attempts', 10)):
+    ...         while reactor.time_passes():
+    ...             pass
+    ...         transaction.begin()
+    ...         for j in jobs:
+    ...             if j.status != zc.async.interfaces.COMPLETED:
+    ...                 break
+    ...         else:
+    ...             break
+    ...         time.sleep(0.1)
+    ...     else:
+    ...         print 'TIME OUT'
+    ...
+
+.. [#z3monitor_setup] This part actually sets up the monitoring.
+
     >>> import zc.ngi.testing
     >>> import zc.z3monitor
 
     >>> connection = zc.ngi.testing.TextConnection()
     >>> server = zc.z3monitor.Server(connection)
 
-    >>> import zc.async.z3monitor
-    >>> import zope.component, zc.z3monitor.interfaces
+    >>> import zc.async.monitor
+    >>> import zope.component
+    >>> import zc.z3monitor.interfaces
     >>> zope.component.provideUtility(
-    ...     zc.async.z3monitor.agent,
+    ...     zc.async.monitor.async,
     ...     zc.z3monitor.interfaces.IZ3MonitorPlugin,
-    ...     'agent')
-    >>> zope.component.provideUtility(
-    ...     zc.async.z3monitor.agents,
-    ...     zc.z3monitor.interfaces.IZ3MonitorPlugin,
-    ...     'agents')
+    ...     'async')
+    >>> zope.component.provideUtility(zc.z3monitor.help,
+    ...     zc.z3monitor.interfaces.IZ3MonitorPlugin, 'help')

Modified: zc.async/branches/dev/src/zc/async/testing.py
===================================================================
--- zc.async/branches/dev/src/zc/async/testing.py	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/testing.py	2008-04-02 03:11:14 UTC (rev 85059)
@@ -41,6 +41,8 @@
 def argh(*args, **kwargs):
     return _datetime(*args, **kwargs)
 
+_datetime.max = _datetime(*old_datetime.max.__reduce__()[1])
+
 def setUpDatetime():
     datetime.datetime = _datetime
     set_now(datetime.datetime(2006, 8, 10, 15, 44, 22, 211, pytz.UTC))

Modified: zc.async/branches/dev/src/zc/async/utils.py
===================================================================
--- zc.async/branches/dev/src/zc/async/utils.py	2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/utils.py	2008-04-02 03:11:14 UTC (rev 85059)
@@ -15,9 +15,9 @@
         return getattr(self._data, name)(*args, **kwargs)
     return wrapper
 
-log = logging.getLogger('zc.async')
+log = logging.getLogger('zc.async.events')
+tracelog = logging.getLogger('zc.async.trace')
 
-
 class Base(persistent.Persistent):
 
     _z_parent__ = parent = None
@@ -112,6 +112,9 @@
     def __init__(self, period, buckets):
         self._data = zope.bforest.periodic.LOBForest(period, count=buckets)
 
+    def clear(self):
+        self._data.clear()
+
     @property
     def period(self):
         return self._data.period

Added: zc.async/branches/dev/src/zc/async/z3tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/z3tests.py	                        (rev 0)
+++ zc.async/branches/dev/src/zc/async/z3tests.py	2008-04-02 03:11:14 UTC (rev 85059)
@@ -0,0 +1,26 @@
+import os
+import unittest
+from zope.testing import doctest, module
+
+import zc.async.tests
+
+def setUp(test):
+    zc.async.tests.modSetUp(test)
+    # make the uuid stable for these tests
+    f = open(os.path.join(
+        os.environ["INSTANCE_HOME"], 'etc', 'uuid.txt'), 'w')
+    f.writelines(('d10f43dc-ffdf-11dc-abd4-0017f2c49bdd',)) # ...random...
+    f.close()
+    zc.async.instanceuuid.UUID = zc.async.instanceuuid.getUUID()
+
+def test_suite():
+    return unittest.TestSuite((
+        doctest.DocFileSuite(
+            'monitor.txt',
+            setUp=setUp, tearDown=zc.async.tests.modTearDown,
+            optionflags=doctest.INTERPRET_FOOTNOTES),
+        ))
+
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')



More information about the Checkins mailing list