[Checkins] SVN: zc.async/trunk/ Work mostly on monitoring.

Gary Poster gary at modernsongs.com
Sun Sep 14 22:40:50 EDT 2008


Log message for revision 91156:
  Work mostly on monitoring.
  
  Also got sidetracked into giving better thread tear down failure information.
  
  This should be at least three separate checkins. :-(
  
  Pertinent changes excerpted from the changelog:
  
  - monitoring support depends on the new zc.monitor package, which is not Zope
    specific.  This means non-Zope 3 apps can take advantage of the monitoring
    support.  To use, use the [monitor] target; this only adds simplejson,
    zc.ngi, and zc.monitor to the basic dependencies.
  
  - Make ftesting try to join worker threads, in addition to polling thread,
    to try to eliminate intermittent test-runner warnings in ftests that a
    thread is left behind.  If the threads do not end, inform the user what jobs
    are not letting go.  (thanks to Patrick Strawderman)
  
  Additional changes:
  
  - completed, pending, and in-agent jobs now do a merge sort
  
  - asyncdb "firstjob" was replaced with "nextpending" and "lastcompleted" after
    I realized that the definition of "first" was too confusing--differed, in
    fact--in context.
  
  - asyncdb functions now have complete docstrings
  
  - jobstats and jobs now have a ``display`` argument that will allow you to see
    job details or repr, rather than the integer OIDs.  Needs test.
  
  - the job detail listing now usable for several asyncdb commands uses repr for
    args and kwargs.  This can let you see the actual reprs for persistent
    object arguments, rather than the more tightly controlled version used
    before.
  
  - renamed zc.async.testing.shut_down_and_wait to
    zc.async.testing.tear_down_dispatcher
  
  - fixed problem with testing helper print_logs that was making it be too chatty
  
  Still need a few more tests, and still have big doc aspirations that I'll
  probably need to compromise on. :-/
  
  

Changed:
  U   zc.async/trunk/buildout.cfg
  U   zc.async/trunk/setup.py
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/TODO.txt
  U   zc.async/trunk/src/zc/async/catastrophes.txt
  U   zc.async/trunk/src/zc/async/catastrophes_revisited.txt
  U   zc.async/trunk/src/zc/async/dispatcher.py
  U   zc.async/trunk/src/zc/async/dispatcher.zcml
  U   zc.async/trunk/src/zc/async/ftesting.py
  U   zc.async/trunk/src/zc/async/ftesting.txt
  U   zc.async/trunk/src/zc/async/monitor.txt
  U   zc.async/trunk/src/zc/async/monitordb.py
  U   zc.async/trunk/src/zc/async/monitordb.txt
  A   zc.async/trunk/src/zc/async/monitortests.py
  U   zc.async/trunk/src/zc/async/testing.py
  U   zc.async/trunk/src/zc/async/utils.py
  U   zc.async/trunk/src/zc/async/z3tests.py

-=-
Modified: zc.async/trunk/buildout.cfg
===================================================================
--- zc.async/trunk/buildout.cfg	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/buildout.cfg	2008-09-15 02:40:49 UTC (rev 91156)
@@ -2,6 +2,8 @@
 parts =
     interpreter
     test
+    monitorinterpreter
+    mtest
     z3interpreter
     z3test
     tags
@@ -18,6 +20,11 @@
 defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1 --auto-color'.split()
 working-directory = ${buildout:directory}
 
+[mtest]
+recipe = zc.recipe.testrunner
+eggs = zc.async [monitor]
+defaults = "--tests-pattern monitortests --exit-with-status -1 --auto-color".split()
+
 [z3test]
 recipe = zc.recipe.testrunner
 eggs = zc.async [z3]
@@ -32,6 +39,11 @@
        Pygments
 interpreter = py
 
+[monitorinterpreter]
+recipe = zc.recipe.egg
+eggs = zc.async [monitor]
+interpreter = mpy
+
 [z3interpreter]
 recipe = zc.recipe.egg
 eggs = zc.async [z3]

Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/setup.py	2008-09-15 02:40:49 UTC (rev 91156)
@@ -109,6 +109,10 @@
         'rwproperty',
         ],
     extras_require={
+        'monitor': [
+            'zc.monitor',
+            'simplejson',
+            ],
         'z3':[
             'zc.z3monitor',
             'zope.security',

Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-09-15 02:40:49 UTC (rev 91156)
@@ -53,10 +53,6 @@
   log, and give up on the job.  Write conflict errors on the job should
   protect us from the edge cases in this story.
 
-- Make ftesting try to join worker threads, in addition to polling thread,
-  to try to eliminate intermittent test-runner warnings in ftests that a
-  thread is left behind.  XXX test
-
 - The dispatcher's ``getActiveJobs`` method now actually tells you information
   about what's going on in the threads at this instant, rather than what's
   going on in the database.  The poll's ``active jobs`` keys continues to
@@ -81,6 +77,16 @@
 - Tried to be significantly reduce the chance of spurious timing errors in the
   tests, at the expense of causing the tests to take longer to run.
 
+- monitoring support depends on the new zc.monitor package, which is not Zope
+  specific.  This means non-Zope 3 apps can take advantage of the monitoring
+  support.  To use, use the [monitor] target; this only adds simplejson,
+  zc.ngi, and zc.monitor to the basic dependencies.
+
+- Make ftesting try to join worker threads, in addition to polling thread,
+  to try to eliminate intermittent test-runner warnings in ftests that a
+  thread is left behind.  If the threads do not end, inform the user what jobs
+  are not letting go.  (thanks to Patrick Strawderman)
+
 1.4.1 (2008-07-30)
 ==================
 

Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/TODO.txt	2008-09-15 02:40:49 UTC (rev 91156)
@@ -1,13 +1,16 @@
+- need to show in monitordb test that completed, in-agent, and pending jobs
+  do a merge sort (esp. completed, because this si important to the semantics
+  of lastcompleted.
+- need test for "display" in asyncdb jobstats and jobs
+
 - fix up tips so that it looks better
 - write a zc.buildout/grok quickstart
-- zc.monitor instead of zc.z3monitor
-- improvements to thread teardown
-- docstrings for monitordb functions
-- waiting thread testing notification improvements (Patrick)
 
+
 Improvements
 
 - queues should be pluggable like agent with filter
+- consider agent sort function for jobs to implement priority queues
 
 More docs:
 

Modified: zc.async/trunk/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes.txt	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/catastrophes.txt	2008-09-15 02:40:49 UTC (rev 91156)
@@ -878,11 +878,11 @@
 .. [#cleanup1]
 
     >>> lock.release()
-    >>> zc.async.testing.shut_down_and_wait(old_dispatcher)
+    >>> zc.async.testing.tear_down_dispatcher(old_dispatcher)
 
 .. [#cleanup2]
 
     >>> lock.release()
-    >>> zc.async.testing.shut_down_and_wait(dispatcher)
-    >>> zc.async.testing.shut_down_and_wait(alt_dispatcher)
+    >>> zc.async.testing.tear_down_dispatcher(dispatcher)
+    >>> zc.async.testing.tear_down_dispatcher(alt_dispatcher)
     >>> time.sleep = old_sleep

Modified: zc.async/trunk/src/zc/async/catastrophes_revisited.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes_revisited.txt	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/catastrophes_revisited.txt	2008-09-15 02:40:49 UTC (rev 91156)
@@ -168,5 +168,5 @@
 
 Now we'll shut down the dispatchers.
 
-    >>> zc.async.testing.shut_down_and_wait(dispatcher)
-    >>> zc.async.testing.shut_down_and_wait(alt_dispatcher)
+    >>> zc.async.testing.tear_down_dispatcher(dispatcher)
+    >>> zc.async.testing.tear_down_dispatcher(alt_dispatcher)

Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/dispatcher.py	2008-09-15 02:40:49 UTC (rev 91156)
@@ -62,7 +62,8 @@
         return self._size
 
     def perform_thread(self):
-        thread_id = thread.get_ident()
+        thread_id = str(thread.get_ident())
+        threading.currentThread().setName(thread_id)
         self.jobids[thread_id] = None
         zc.async.local.dispatcher = self.dispatcher
         zc.async.local.name = self.name # this is the name of this pool's agent
@@ -75,7 +76,7 @@
                 info['thread'] = thread_id
                 info['started'] = datetime.datetime.utcnow()
                 zc.async.utils.tracelog.info(
-                    'starting in thread %d: %s',
+                    'starting in thread %s: %s',
                     info['thread'], info['call'])
                 backoff = self.initial_backoff
                 conflict_retry_count = 0
@@ -178,7 +179,7 @@
                     zc.async.local.job = None # also in job (here for paranoia)
                     transaction.abort() # (also paranoia)
                 zc.async.utils.tracelog.info(
-                    'completed in thread %d: %s',
+                    'completed in thread %s: %s',
                     info['thread'], info['call'])
                 self.jobids[thread_id] = None
                 job_info = self.queue.get()
@@ -495,6 +496,10 @@
             raise ValueError('not activated')
         self.activated = None # "in progress"
         try:
+            # Note: we do not want to clear jobs and polls, because they can
+            # be helpful diagnostic information (particularly in the use of
+            # zc.async.testing.tear_down_dispatcher to identify jobs that won't
+            # stop).
             transaction.begin()
             try:
                 identifier = 'cleanly deactivating UUID %s' % (self.UUID,)

Modified: zc.async/trunk/src/zc/async/dispatcher.zcml
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.zcml	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/dispatcher.zcml	2008-09-15 02:40:49 UTC (rev 91156)
@@ -2,9 +2,9 @@
 <configure xmlns="http://namespaces.zope.org/zope">
     <include file="configure.zcml" />
     <utility component=".monitor.async"
-             provides="zc.z3monitor.interfaces.IZ3MonitorPlugin"
+             provides="zc.monitor.interfaces.IMonitorPlugin"
              name="async" />
     <utility component=".monitordb.asyncdb"
-             provides="zc.z3monitor.interfaces.IZ3MonitorPlugin"
+             provides="zc.monitor.interfaces.IMonitorPlugin"
              name="asyncdb" />
 </configure>

Modified: zc.async/trunk/src/zc/async/ftesting.py
===================================================================
--- zc.async/trunk/src/zc/async/ftesting.py	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/ftesting.py	2008-09-15 02:40:49 UTC (rev 91156)
@@ -47,5 +47,5 @@
         logger = logging.getLogger('zc.async')
         logger.removeHandler(dispatcher._debug_handler)
         del dispatcher._debug_handler
-    zc.async.testing.shut_down_and_wait(dispatcher)
+    zc.async.testing.tear_down_dispatcher(dispatcher)
     zc.async.dispatcher.clear()

Modified: zc.async/trunk/src/zc/async/ftesting.txt
===================================================================
--- zc.async/trunk/src/zc/async/ftesting.txt	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/ftesting.txt	2008-09-15 02:40:49 UTC (rev 91156)
@@ -19,7 +19,9 @@
   you need to make sure that ftests do not use dispatchers started in the
   application. Start up ftest (or integration test) dispatchers separately,
   using ``zc.async.ftesting.setUp``, and then tear down after the tests are
-  done with ``zc.async.ftesting.tearDown``.
+  done with ``zc.async.ftesting.tearDown``.  The ``tearDown`` feature tries
+  to shut down all threads, and tries to let you know what job was running in
+  threads that couldn't be cleanly stopped.
 
   The ftest dispatcher polls every tenth of a second, so you shouldn't need to
   wait long for you job to get started in your tests.
@@ -31,6 +33,8 @@
 - If you don't want to dig into guts in your functional tests to use the tools
   described in the previous point, consider making a view to check on job
   status using a data structure like JSON, and looking at that in your tests.
+  Alternatively, investigate the tools in monitordb.py--although the tools
+  were created for zc.monitor, they can still be used effectively in Python.
 
 - The ``setUp`` code by default sends critical log messages to __stdout__ so it
   can help diagnose why a callback might never complete.
@@ -201,6 +205,49 @@
     >>> dispatcher.activated
     False
 
+ftesting.tearDown attempts to join all threads in the dispatchers' queues, but
+will raise an error if a job or dispatcher fails to shut down.
+
+If the thread is performing a job, the error informs you what job is being
+performed.
+
+    >>> zc.async.ftesting.setUp()
+    >>> _ = transaction.begin()
+    >>> queue = root[zc.async.interfaces.KEY]['']
+    >>> def bad_job():
+    ...     zc.async.testing.time_sleep(4)
+    >>> job = queue.put(bad_job)
+    >>> transaction.commit()
+    >>> zc.async.testing.wait_for_start(job)
+    >>> zc.async.ftesting.tearDown() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    TearDownDispatcherError: 
+    Job in pool 'main' failed to stop:
+      <zc.async.job.Job (oid ..., db ...) ``zc.async.doctest_test.bad_job()``>
+    >>> zc.async.testing.wait_for_result(job)
+
+If the dispatcher isn't shutting down for some reason, the UUID is given.
+
+    >>> zc.async.ftesting.tearDown()
+    >>> zc.async.ftesting.setUp()
+    >>> dispatcher = zc.async.dispatcher.get()
+    >>> def noop(*kw):
+    ...     pass
+    >>> original_stop = dispatcher.reactor.stop
+    >>> dispatcher.reactor.stop = noop
+    >>> zc.async.ftesting.tearDown() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    TearDownDispatcherError: 
+    Dispatcher (..., ...) failed to stop.
+
+Let's restore the original reactor.stop method and call tearDown again, which
+will work this time.
+
+    >>> dispatcher.reactor.stop = original_stop
+    >>> zc.async.ftesting.tearDown()
+
 Also worth noting, as mentioned in the summary, is that you should use
 zope.app.testing version 3.4.2 or higher to avoid getting spurious,
 intermittent bug reports from ftests that use zc.async.
@@ -214,4 +261,4 @@
     >>> storage.cleanup()
 
     >>> del storage # we just do this to not confuse our own tearDown code.
-    >>> del globals()['getRootFolder'] # clean up globals
+    >>> del globals()['getRootFolder'] # clean up globals; probably unnecessary

Modified: zc.async/trunk/src/zc/async/monitor.txt
===================================================================
--- zc.async/trunk/src/zc/async/monitor.txt	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/monitor.txt	2008-09-15 02:40:49 UTC (rev 91156)
@@ -2,11 +2,11 @@
 ======================
 
 A process's zc.async dispatcher [#setUp]_ can be monitored in-process via
-zc.z3monitor plugins.  Let's imagine we have a connection over which we
-can send text messages to the monitor server [#z3monitor_setup]_.
+zc.monitor plugins.  Let's imagine we have a connection over which we
+can send text messages to the monitor server [#monitor_setup]_.
 
 All monitoring is done through the ``async`` command.  Here is its
-description, using the zc.z3monitor ``help`` command.
+description, using the zc.monitor ``help`` command.
 
     >>> connection.test_input('help async\n')
     Help for async:
@@ -714,20 +714,20 @@
     >>> queue.dispatchers[dispatcher.UUID]['main'] = agent
     >>> transaction.commit()
 
-.. [#z3monitor_setup] This part actually sets up the monitoring.
+.. [#monitor_setup] This part actually sets up the monitoring.
 
     >>> import zc.ngi.testing
-    >>> import zc.z3monitor
+    >>> import zc.monitor
 
     >>> connection = zc.ngi.testing.TextConnection()
-    >>> server = zc.z3monitor.Server(connection)
+    >>> server = zc.monitor.Server(connection)
 
     >>> import zc.async.monitor
     >>> import zope.component
-    >>> import zc.z3monitor.interfaces
+    >>> import zc.monitor.interfaces
     >>> zope.component.provideUtility(
     ...     zc.async.monitor.async,
-    ...     zc.z3monitor.interfaces.IZ3MonitorPlugin,
+    ...     zc.monitor.interfaces.IMonitorPlugin,
     ...     'async')
-    >>> zope.component.provideUtility(zc.z3monitor.help,
-    ...     zc.z3monitor.interfaces.IZ3MonitorPlugin, 'help')
+    >>> zope.component.provideUtility(zc.monitor.help,
+    ...     zc.monitor.interfaces.IMonitorPlugin, 'help')

Modified: zc.async/trunk/src/zc/async/monitordb.py
===================================================================
--- zc.async/trunk/src/zc/async/monitordb.py	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/monitordb.py	2008-09-15 02:40:49 UTC (rev 91156)
@@ -1,5 +1,6 @@
 import datetime
 import fnmatch
+import itertools
 import re
 from uuid import UUID # we use this non-standard import spelling because
 # ``uuid`` is frequently an argument
@@ -13,6 +14,7 @@
 import zc.async.dispatcher
 import zc.async.interfaces
 import zc.async.monitor
+import zc.async.utils
 
 _available_states = frozenset(
     ('pending', 'assigned', 'active', 'callbacks', 'completed', 'succeeded',
@@ -93,14 +95,64 @@
         queue = re.compile(fnmatch.translate(queue)).match
     if agent is not None:
         agent = re.compile(fnmatch.translate(agent)).match
+    sources = []
+    if pending:
+        def pending_source(q, agent_filters, ignore_agent_filters):
+            for j in q:
+                if not ignore_agent_filters:
+                    for f in agent_filters:
+                        if f(j):
+                            break # this is a positive match
+                    else:
+                        continue
+                for f in filters:
+                    if not f(j):
+                        break # this is a negative match
+                else:
+                    yield j
+        def pending_key(job):
+            return job.begin_after.isoformat()
+        pending_sources = []
+        sources.append((pending_sources, pending_key))
+    if agent_states:
+        def agent_source(a):
+            for j in a:
+                if j.status not in agent_states:
+                    continue
+                for f in filters:
+                    if not f(j):
+                        break
+                else:
+                    yield j
+        def agent_key(job):
+            return (job.active_start or job.begin_after).isoformat()
+        agent_sources = []
+        sources.append((agent_sources, agent_key))
+    if completed:
+        def completed_source(a):
+            for j in a.completed:
+                if completed!='completed':
+                    is_failure = isinstance(
+                        j.result, twisted.python.failure.Failure)
+                    if (completed=='succeeded' and is_failure or
+                        completed=='failed' and not is_failure):
+                        continue
+                for f in filters:
+                    if not f(j):
+                        break
+                else:
+                    yield j
+        def completed_key(job):
+            return job.key # == reverse int of job.initial_callbacks_end
+        completed_sources = []
+        sources.append((completed_sources, completed_key))
     queues = conn.root()[zc.async.interfaces.KEY]
     for q_name, q in queues.items():
         if queue and not queue(q_name):
             continue
         agent_filters = []
         ignore_agent_filters = agent is None and uuid is None
-        if (assigned or active or callbacks or completed or
-            pending and not ignore_agent_filters):
+        if (agent_states or completed or pending and not ignore_agent_filters):
             if uuid is None:
                 das = q.dispatchers.values()
             else:
@@ -124,54 +176,26 @@
                                     '(%s : %s : %s)' %
                                     (q_name, da.UUID, a_name))
                     if agent_states:
-                        for j in a:
-                            if j.status not in agent_states:
-                                continue
-                            for f in filters:
-                                if not f(j):
-                                    break
-                            else:
-                                yield j
+                        agent_sources.append(agent_source(a))
                     if completed:
-                        for j in a.completed:
-                            if completed!='completed':
-                                is_failure = isinstance(
-                                    j.result, twisted.python.failure.Failure)
-                                if (completed=='succeeded' and is_failure or
-                                    completed=='failed' and not is_failure):
-                                    continue
-                            for f in filters:
-                                if not f(j):
-                                    break
-                            else:
-                                yield j
-        if pending:
-            if not agent or agent_filters:
-                for j in q:
-                    if not ignore_agent_filters:
-                        for f in agent_filters:
-                            if f(j):
-                                break # this is a positive match
-                        else:
-                            continue
-                    for f in filters:
-                        if not f(j):
-                            break # this is a negative match
-                    else:
-                        yield j
+                        completed_sources.append(completed_source(a))
+        if pending and (not agent or agent_filters):
+            pending_sources.append(
+                pending_source(q, agent_filters, ignore_agent_filters))
+    return itertools.chain(
+        *(zc.async.utils.sortedmerge(s, key) for s, key in sources))
 
 def jobs(context, *states, **kwargs):
     """Return jobs in one or more states.
 
-    Jobs are identified by integer OID and database name.  These identifiers
-    can be used with the "asyncdb job" command to get details about the jobs.
-    The integer OIDs can be used in a database connection to get the job with
-    ``connection.get(ZODB.utils.p64(INTEGER_OID))``.
+    By default, jobs are identified by integer OID and database name.  These
+    identifiers can be used with the "asyncdb job" command to get details about
+    the jobs. The integer OIDs can be used in a database connection to get the
+    job with ``connection.get(ZODB.utils.p64(INTEGER_OID))``.  For other
+    display options for jobs, see the "display" optional argument.
+    
+    After the arguments list, this description concludes with usage examples.
 
-    The asyncdb commands "jobs," "count," "jobstats," and "firstjob" all share
-    the same arguments, which are described below; after which usage examples
-    for this command are listed.
-
     Arguments
     =========
     
@@ -279,7 +303,12 @@
       will not affect the value that this filter uses.
     
       This is based on a job's ``initial_callbacks_end`` attribute.
-    
+
+    - "display": By default, or with a "default" value, jobs are identified
+      with integer OID and database name.  If the display value is "repr,"
+      reprs of the jobs are used instead.  If the display value is "detail,"
+      a dictionary of details is used for each job.
+
     Usage Examples
     ==============
     
@@ -318,15 +347,23 @@
       and one minute ago."  (This also shows that the order of "before" and
       "since" do not matter.)
     """
-    return _jobs(context, states, **kwargs)
+    display = kwargs.pop('display', 'default').lower()
+    res = _jobs(context, states, **kwargs)
+    if display == 'default':
+        return res
+    elif display == 'repr':
+        return (repr(j) for j in res)
+    elif display == 'details':
+        return (jobsummary(j) for j in res)
+    else:
+        raise ValueError('unknown value for "display": '
+                         'must be one of "default," "repr," or "details."')
 
 def count(context, *states, **kwargs):
     """Count jobs in one or more states.
+    
+    After the arguments list, this description concludes with usage examples.
 
-    The asyncdb commands "jobs," "count," "jobstats," and "firstjob" all share
-    the same arguments, which are described below; after which usage examples
-    for this command are listed.
-
     Arguments
     =========
     
@@ -487,15 +524,26 @@
 def jobstats(context, *states, **kwargs):
     """Return statistics about jobs in one or more states.
 
-    XXX describe statistics
+    The report shows the following statistics.
+    
+    - The number of jobs that match the search in each of these states:
+      "pending," "assigned," "active," "callbacks," "succeeded," and "failed".
 
-    Jobs are identified by integer OID and database name.  These identifiers
-    can be used with the "asyncdb job" command to get details about the jobs.
+    - "longest wait" and "shortest wait" give the wait duration and identifier
+      of the job with the longest and shortest wait interval, respectively.
 
-    The asyncdb commands "jobs," "count," "jobstats," and "firstjob" all share
-    the same arguments, which are described below; after which usage examples
-    for this command are listed.
+    - "longest active" and "shortest active" give the active duration and
+      identifier of the job with the longest and shortest active duration,
+      respectively.
 
+    By default, jobs are identified by integer OID and database name.  These
+    identifiers can be used with the "asyncdb job" command to get details about
+    the jobs. The integer OIDs can be used in a database connection to get the
+    job with ``connection.get(ZODB.utils.p64(INTEGER_OID))``.  Alternatively,
+    for other display options for jobs, see the "display" optional argument.
+    
+    After the arguments list, this description concludes with usage examples.
+
     Arguments
     =========
     
@@ -603,6 +651,11 @@
       will not affect the value that this filter uses.
     
       This is based on a job's ``initial_callbacks_end`` attribute.
+
+    - "display": By default, or with a "default" value, jobs are identified
+      with integer OID and database name.  If the display value is "repr,"
+      reprs of the jobs are used instead.  If the display value is "detail,"
+      a dictionary of details is used for each job.
     
     Usage Examples
     ==============
@@ -646,6 +699,16 @@
          'succeeded': 0, 'failed': 0}
     longest_wait = longest_active = None
     shortest_wait = shortest_active = None
+    display = kwargs.pop('display', 'default').lower()
+    if display == 'default':
+        job_display = lambda j: j
+    elif display == 'repr':
+        job_display = lambda j: j is not None and repr(j) or j
+    elif display == 'details':
+        job_display = lambda j: j is not None and jobsummary(j) or j
+    else:
+        raise ValueError('unknown value for "display": '
+                         'must be one of "default," "repr," or "details."')
     for j in _jobs(context, states, **kwargs):
         status = j.status 
         if status == zc.async.interfaces.COMPLETED:
@@ -676,10 +739,22 @@
         if (shortest_wait is None or
             shortest_wait[0] < wait):
             shortest_wait = wait, j
-    d['longest wait'] = longest_wait
-    d['longest active'] = longest_active
-    d['shortest wait'] = shortest_wait
-    d['shortest active'] = shortest_active
+    d['longest wait'] = (
+        longest_wait is not None and
+        (longest_wait[0], job_display(longest_wait[1])) or
+        longest_wait)
+    d['longest active'] = (
+        longest_active is not None and
+        (longest_active[0], job_display(longest_active[1])) or
+        longest_active)
+    d['shortest wait'] = (
+        shortest_wait is not None and
+        (shortest_wait[0], job_display(shortest_wait[1])) or
+        shortest_wait)
+    d['shortest active'] = (
+        shortest_active is not None and
+        (shortest_active[0], job_display(shortest_active[1])) or
+        shortest_active)
     return d
 
 def jobsummary(job):
@@ -711,8 +786,8 @@
     else:
         queue = None
     return {'repr': repr(job),
-            'args': list(job.args),
-            'kwargs': dict(job.kwargs),
+            'args': list(repr(a) for a in job.args),
+            'kwargs': dict((k, repr(v)) for k, v in job.kwargs.items()),
             'begin after': job.begin_after,
             'active start': job.active_start,
             'active end': job.active_end,
@@ -747,57 +822,218 @@
     return job.result.getTraceback(detail=detail)
 
 def job(context, oid, database=None):
-    """Return summary of job identified by integer oid."""
+    """Return details of job identified by integer oid.
+    
+    The result includes the following information:
+    
+    - "active": how long the job was,or has been active.
+
+    - "active end": when the job ended its work (before callbacks).
+
+    - "active start": when the job started its work.
+
+    - "agent": the name of the agent that performed this job.
+
+    - "args": the standard repr of each argument to this job.
+
+    - "begin after": when the job was requested to begin.
+
+    - "callbacks": identifiers of the callbacks to this job.
+
+    - "dispatcher": the UUID of the dispatcher that performed this job.
+
+    - "failed": whether the job failed (raised an unhandled exception).
+
+    - "initial callbacks end": when the callbacks were first completed.
+
+    - "kwargs": standard reprs of each keyword argument to this job.
+
+    - "queue": the name of the queue that performed this job.
+
+    - "quota names": the quota names of this job.
+
+    - "repr": a repr of this job (includes its integer OID and database name).
+
+    - "result": a repr of the result of the job; OR a brief traceback.
+
+    - "status": the status of the job.
+
+    - "wait": how long the job was, or has been waiting.
+
+    Times are in UTC.
+    """
     return jobsummary(_get_job(context, oid, database))
 
-def firstjob(context, *states, **kwargs):
-    """Return summary of first job found matching given filters.
+def nextpending(context, **kwargs):
+    """Return details of the next job in queue to be performed.
+    
+    The result includes the following information:
+    
+    - "active": how long the job was,or has been active.
 
-    XXX describe what "first" means for different states.
+    - "active end": when the job ended its work (before callbacks).
 
-    The asyncdb commands "jobs," "count," "jobstats," and "firstjob" all share
-    the same arguments, which are described below; after which usage examples
-    for this command are listed.
+    - "active start": when the job started its work.
 
+    - "agent": the name of the agent that performed this job.
+
+    - "args": the standard repr of each argument to this job.
+
+    - "begin after": when the job was requested to begin.
+
+    - "callbacks": identifiers of the callbacks to this job.
+
+    - "dispatcher": the UUID of the dispatcher that performed this job.
+
+    - "failed": whether the job failed (raised an unhandled exception).
+
+    - "initial callbacks end": when the callbacks were first completed.
+
+    - "kwargs": standard reprs of each keyword argument to this job.
+
+    - "queue": the name of the queue that performed this job.
+
+    - "quota names": the quota names of this job.
+
+    - "repr": a repr of this job (includes its integer OID and database name).
+
+    - "result": a repr of the result of the job; OR a brief traceback.
+
+    - "status": the status of the job.
+
+    - "wait": how long the job was, or has been waiting.
+
+    Times are in UTC.
+    
+    After the arguments list, this description concludes with usage examples.
+
     Arguments
     =========
-    
-    States
-    ------
 
-    You must provide at least one of the following states.
+    Optional Arguments
+    ------------------
 
-    - "pending": the job is in a queue, waiting to be started.
+    You can filter your results with a number of optional arguments.
+
+    "Shell-style glob wildcards," as referred to in this list, are "?", "*",
+    "[SEQUENCE]", and "[!SEQUENCE]", as described in
+    http://docs.python.org/lib/module-fnmatch.html .
     
-    - "assigned": a dispatcher has claimed the job and assigned it to one of
-      its worker threads.  Work has not yet begun.  Jobs are in this state very
-      briefly.
+    A "duration-based filter" described in this list accepts an argument that
+    is of the form "sinceINTERVAL", "beforeINTERVAL", or
+    "sinceINTERVAL,beforeINTERVAL" (no space!).  The "INTERVAL"s are of the
+    form ``[nD][nH][nM][nS]``, where "n" should be replaced with a positive
+    integer, and "D," "H," "M," and "S" are literals standing for "days,"
+    "hours," "minutes," and "seconds." For instance, you might use ``5M`` for
+    five minutes, ``20S`` for twenty seconds, or ``1H30M`` for an hour and a
+    half.  Thus "before30M" would mean "thirty minutes ago or older." 
+    "since45S" would mean "45 seconds ago or newer."  "since1H,before30M" would
+    mean "between thirty minutes and an hour ago."  Note that reversing the two
+    components in the last example--"before30M,since1H"--is equivalent.
     
-    - "active": A worker thread is performing this job.
+
+    - "callable": filters by callable name.  Supports shell-style glob
+      wildcards.  If you do not include a "." in the string, it matches only on
+      the callable name.  If you include a ".", it matches on the
+      fully-qualified name (that is, including the module).
     
-    - "callbacks": the job's work is ended, and the thread is performing the
-      callbacks, if any.
+    - "queue": filters by queue name.  Supports shell-style glob wildcards.
     
-    - "completed": the job and its callbacks are completed.  Completed jobs
-      stay in the database for only a certain amount of time--between seven and
-      eight days in the default agent implementation.
+    - "agent": filters by agent name.  Supports shell-style glob wildcards.
+      This restricts the jobs to the ones that the agent could perform,
+      according to its filter.
+    
+    - "uuid": filters by UUID string, or the special marker "THIS", indicating
+      the UUID of the current process' dispatcher.  Supports shell-style glob
+      wildcards.  This restricts the jobs to the ones that the agents for that
+      dispatcher could perform, according to their filters.
+    
+    - "requested_start": a duration-based filter for when the job was requested
+      to start.
+      
+      Note that, if a job is not given an explicit start time, the time when it
+      was added to a queue is used.  This is based on a job's ``begin_after``
+      attribute.
+    
+    Usage Examples
+    ==============
+    
+    Here are some examples of the command.
 
-    - "succeeded": the job completed successfully (that is, without raising an
-      unhandled exception, and without returning an explicit
-      twisted.python.failure.Failure).  This is a subset of "completed,"
-      described above.
+        asyncdb nextpending
+        (gives details of next pending job)
+        
+        asyndb nextpending agent:instance5
+        (gives details of the next pending job that any of the "instance5"
+        agents could work on)
 
-     - "failed": the job completed by raising an unhandled exception or by
-       explicitly returning a twisted.python.failure.Failure.  This is a subset
-       of "completed," described above.
+        asyncdb nextpending callable:import_*
+        (gives details about the next pending job with a callable beginning
+        with the name "import_")
 
-    You may use no more than one of the states "completed," "succeeded," and
-    "failed".
+        asyncdb nextpending start:before1M
+        (gives details of the next pending job that was supposed to begin
+        one minute ago or longer)
+    """
+    unsupported = set(['start', 'end', 'callbacks_completed']).intersection(
+        kwargs)
+    if unsupported:
+        raise ValueError('unsupported filters: %s' %
+                         (', '.join(sorted(unsupported)),))
+    for j in _jobs(context, ('pending',), **kwargs):
+        return jobsummary(j)
+    return None
 
+def lastcompleted(context, **kwargs):
+    """Return details of the most recently completed job.
+    
+    The result includes the following information:
+    
+    - "active": how long the job was,or has been active.
+
+    - "active end": when the job ended its work (before callbacks).
+
+    - "active start": when the job started its work.
+
+    - "agent": the name of the agent that performed this job.
+
+    - "args": the standard repr of each argument to this job.
+
+    - "begin after": when the job was requested to begin.
+
+    - "callbacks": identifiers of the callbacks to this job.
+
+    - "dispatcher": the UUID of the dispatcher that performed this job.
+
+    - "failed": whether the job failed (raised an unhandled exception).
+
+    - "initial callbacks end": when the callbacks were first completed.
+
+    - "kwargs": standard reprs of each keyword argument to this job.
+
+    - "queue": the name of the queue that performed this job.
+
+    - "quota names": the quota names of this job.
+
+    - "repr": a repr of this job (includes its integer OID and database name).
+
+    - "result": a repr of the result of the job; OR a brief traceback.
+
+    - "status": the status of the job.
+
+    - "wait": how long the job was, or has been waiting.
+
+    Times are in UTC.
+    
+    After the arguments list, this description concludes with usage examples.
+
+    Arguments
+    =========
+
     Optional Arguments
     ------------------
 
-    You can further filter your results with a number of optional arguments.
+    You can filter your results with a number of optional arguments.
 
     "Shell-style glob wildcards," as referred to in this list, are "?", "*",
     "[SEQUENCE]", and "[!SEQUENCE]", as described in
@@ -872,21 +1108,17 @@
     
     Here are some examples of the command.
 
-        asyncdb firstjob pending
-        (describes the first (next-to-be-processed) pending jobs)
+        asyncdb lastcompleted
+        (gives details about the most recently completed job)
         
-        asyndb job active agent:instance5
-        (describes the first job that any agent named instance5
-        is working on; "first" doesn't mean much here.)
-        
-        asyndb job pending agent:instance5
-        (describes the first (next-to-be-processed) pending jobs that agents
-        named "instance5" could perform)
+        asyndb lastcompleted agent:instance5
+        (gives details about the most recently completed job that any agent
+        named "instance5" has worked on)
 
-        asyncdb job completed end:since1H callable:import_*
-        (describes the first (most recently completed) completed job that ended
-        within the last hour that called a function or method that began with
-        the string "import_")
+        asyncdb lastcompleted end:since1H callable:import_*
+        (gives details about the most recently completed job that ended within
+        the last hour that called a function or method that began with the
+        string "import_")
 
     Here are some examples of how the duration-based filters work.
     
@@ -905,7 +1137,7 @@
       and one minute ago."  (This also shows that the order of "before" and
       "since" do not matter.)
     """
-    for j in _jobs(context, states, **kwargs):
+    for j in _jobs(context, ('completed',), **kwargs):
         return jobsummary(j)
     return None
 
@@ -920,7 +1152,10 @@
             if da.activated]
 
 def status(context, queue=None, agent=None, uuid=None):
-    """Return status of the agents of all queues and all active UUIDs."""
+    """Return status of the agents of all queues and all active UUIDs.
+    
+    Times are in UTC.
+    """
     conn = ZODB.interfaces.IConnection(context)
     if uuid is not None:
         if uuid.upper() == 'THIS':
@@ -983,8 +1218,8 @@
         return 'Unknown async tool'
     return f.__doc__
 
-for f in (
-    count, jobs, job, firstjob, traceback, jobstats, UUIDs, status, help):
+for f in (count, jobs, job, nextpending, lastcompleted, traceback, jobstats,
+          UUIDs, status, help):
     name = f.__name__
     funcs[name] = f
 

Modified: zc.async/trunk/src/zc/async/monitordb.txt
===================================================================
--- zc.async/trunk/src/zc/async/monitordb.txt	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/monitordb.txt	2008-09-15 02:40:49 UTC (rev 91156)
@@ -2,11 +2,11 @@
 ====================================================
 
 The zc.async database activity can be monitored and introspected via
-zc.z3monitor plugins.  Let's imagine we have a connection over which we can
+zc.monitor plugins.  Let's imagine we have a connection over which we can
 send text messages to the monitor server [#setUp]_.
 
 All monitoring is done through the ``asyncdb`` command.  Here is its
-description, using the zc.z3monitor ``help`` command.
+description, using the zc.monitor ``help`` command.
 
     >>> connection.test_input('help asyncdb\n')
     Help for asyncdb:
@@ -31,11 +31,12 @@
     <BLANKLINE>
     UUIDs: Return all active UUIDs.
     count: Count jobs in one or more states.
-    firstjob: Return summary of first job found matching given filters.
     help: Get help on an asyncdb monitor tool.
-    job: Return summary of job identified by integer oid.
+    job: Return details of job identified by integer oid.
     jobs: Return jobs in one or more states.
     jobstats: Return statistics about jobs in one or more states.
+    lastcompleted: Return details of the most recently completed job.
+    nextpending: Return details of the next job in queue to be performed.
     status: Return status of the agents of all queues and all active UUIDs.
     traceback: Return the traceback for the job identified by integer oid. 
     -> CLOSE
@@ -265,8 +266,8 @@
         "active start": null, 
         "agent": null, 
         "args": [
-            18, 
-            18
+            "18", 
+            "18"
         ], 
         "begin after": "2006-08-10T15:44:23.000211Z", 
         "callbacks": [
@@ -279,7 +280,7 @@
         "failed": false, 
         "initial callbacks end": null, 
         "kwargs": {
-            "start": 6
+            "start": "6"
         }, 
         "queue": "", 
         "quota names": [], 
@@ -292,6 +293,7 @@
     } 
     -> CLOSE
 
+
 Specifying the database name is equivalent (note that, confusingly, this
 database's name is "unnamed").
 
@@ -302,8 +304,8 @@
         "active start": null, 
         "agent": null, 
         "args": [
-            18, 
-            18
+            "18", 
+            "18"
         ], 
         "begin after": "2006-08-10T15:44:23.000211Z", 
         "callbacks": [
@@ -316,7 +318,7 @@
         "failed": false, 
         "initial callbacks end": null, 
         "kwargs": {
-            "start": 6
+            "start": "6"
         }, 
         "queue": "", 
         "quota names": [], 
@@ -329,9 +331,9 @@
     } 
     -> CLOSE
 
-We can also use ``firstjob`` to look at the first job in a given state.
+We can also use ``nextpending`` to look at the next job to be performed.
 
-    >>> connection.test_input('asyncdb firstjob pending\n')
+    >>> connection.test_input('asyncdb nextpending\n')
     {
         "active": null, 
         "active end": null, 
@@ -355,10 +357,10 @@
     } 
     -> CLOSE
 
-The ``jobs``, ``count``, ``jobstats`` and ``firstjob`` commands all support the
-same filtering options, which allow you to filter against a certain dispatcher,
-a certain agent, or a string for the callable, or for various times.  Here's an
-example of filtering by callable name.
+The ``jobs``, ``count``, ``jobstats``, ``nextpending``, and ``lastcompleted``
+commands all support similar filtering options, which allow you to filter
+against a certain dispatcher, a certain agent, or a string for the callable, or
+for various times.  Here's an example of filtering by callable name.
 
     >>> connection.test_input('asyncdb count pending callable:send_message\n')
     1 
@@ -550,8 +552,9 @@
     -> CLOSE
 
 Now, let's look at some of the date-based filters: ``requested_start``,
-``start``, ``end``, and ``callbacks_completed``.  They filter
-the results of the ``jobs``, ``count``, ``jobstats`` and ``firstjob`` tools.
+``start``, ``end``, and ``callbacks_completed``.  They filter the results of
+the ``jobs``, ``count``, ``jobstats``, ``nextpending`` and ``lastcompleted``
+tools.
 
 Each may be of the form "sinceINTERVAL", "beforeINTERVAL", or
 "sinceINTERVAL,beforeINTERVAL".  Intervals are of the format
@@ -710,7 +713,7 @@
 about completed jobs rotates out periodically from the database, so, by
 default, you only have about seven days worth of completed jobs).
 
-    >>> connection.test_input('asyncdb firstjob completed\n')
+    >>> connection.test_input('asyncdb lastcompleted\n')
     {
         "active": {
             "seconds": 6.0
@@ -738,7 +741,7 @@
 
 What's the first completed job that ended more than 5 seconds ago?
 
-    >>> connection.test_input('asyncdb firstjob completed end:before5S\n')
+    >>> connection.test_input('asyncdb lastcompleted end:before5S\n')
     ... # doctest: +ELLIPSIS
     {
         "active": {
@@ -1168,20 +1171,20 @@
     >>> transaction.commit()
 
     >>> import zc.ngi.testing
-    >>> import zc.z3monitor
+    >>> import zc.monitor
 
     >>> connection = zc.ngi.testing.TextConnection()
-    >>> server = zc.z3monitor.Server(connection)
+    >>> server = zc.monitor.Server(connection)
 
     >>> import zc.async.monitordb
     >>> import zope.component
-    >>> import zc.z3monitor.interfaces
+    >>> import zc.monitor.interfaces
     >>> zope.component.provideUtility(
     ...     zc.async.monitordb.asyncdb,
-    ...     zc.z3monitor.interfaces.IZ3MonitorPlugin,
+    ...     zc.monitor.interfaces.IMonitorPlugin,
     ...     'asyncdb')
-    >>> zope.component.provideUtility(zc.z3monitor.help,
-    ...     zc.z3monitor.interfaces.IZ3MonitorPlugin, 'help')
+    >>> zope.component.provideUtility(zc.monitor.help,
+    ...     zc.monitor.interfaces.IMonitorPlugin, 'help')
 
 .. [#tearDown]
     >>> threads = []

Added: zc.async/trunk/src/zc/async/monitortests.py
===================================================================
--- zc.async/trunk/src/zc/async/monitortests.py	                        (rev 0)
+++ zc.async/trunk/src/zc/async/monitortests.py	2008-09-15 02:40:49 UTC (rev 91156)
@@ -0,0 +1,44 @@
+##############################################################################
+#
+# Copyright (c) 2006-2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import os
+import unittest
+from zope.testing import doctest, module
+import zope.component.testing
+import zc.ngi.async # to quiet the thread complaints from the testing
+# infrastructure, because there is no API way as of this writing to stop the
+# z3monitor server or the zc.ngi.async thread. :-(
+
+import zc.async.tests
+
+def setUp(test):
+    zc.async.tests.modSetUp(test)
+    # make the uuid stable for these tests
+    f = open(os.environ["ZC_ASYNC_UUID"], 'w')
+    # make this stable for test purposes
+    f.writelines(('d10f43dc-ffdf-11dc-abd4-0017f2c49bdd',))
+    f.close()
+    zc.async.instanceuuid.UUID = zc.async.instanceuuid.getUUID()
+
+def test_suite():
+    return unittest.TestSuite((
+        doctest.DocFileSuite(
+            'monitor.txt',
+            'monitordb.txt',
+            setUp=setUp, tearDown=zc.async.tests.modTearDown,
+            optionflags=doctest.INTERPRET_FOOTNOTES),
+        ))
+
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')

Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/testing.py	2008-09-15 02:40:49 UTC (rev 91156)
@@ -259,21 +259,49 @@
     else:
         assert False, 'annotation never found'
 
-def shut_down_and_wait(dispatcher):
+
+class TearDownDispatcherError(RuntimeError):
+    pass
+
+def tear_down_dispatcher(dispatcher):
     threads = []
     for queue_pools in dispatcher.queues.values():
         for pool in queue_pools.values():
-            threads.extend(pool.threads)
+            threads.extend((thread, pool) for thread in pool.threads)
+    problems = []
     dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
     dispatcher.thread.join(3)
+    if dispatcher.thread.isAlive():
+        problems.append(
+            'Dispatcher (%s, %s) failed to stop.' %
+            (dispatcher.thread.getName(), dispatcher.UUID))
     # in most cases, this is unnecessary, but in some instances, such as in
     # some examples in catastrophes.txt, this is needed.
     for queue_pools in dispatcher.queues.values():
         for pool in queue_pools.values():
            pool.setSize(0)
     # this makes sure that all the worker threads have a chance to stop.
-    for thread in threads:
+    for thread, pool in threads:
         thread.join(3)
+        if thread.isAlive():
+            name = thread.getName()
+            jobid = pool.jobids.get(name)
+            # from here, we could try going to the database, or to the past
+            # jobs in the dispatcher's rotating history.  We'll just go with
+            # the dispatcher's history--without trying to open the database.
+            if jobid is not None:
+                jobinfo = dispatcher.jobs.get(jobid)
+                if jobinfo is None:
+                    jobid = str(jobid)
+                else:
+                    jobid = jobinfo['call']
+            else:
+                jobid = '[job unknown]'
+            problems.append(
+                'Job in pool %r failed to stop: %s' % (pool.name, jobid))
+    if problems:
+        problems = '\n' + '\n'.join(problems)
+        raise TearDownDispatcherError(problems)
 
 def print_logs(log_file=sys.stdout, log_level=logging.CRITICAL):
     # really more of a debugging tool
@@ -281,6 +309,6 @@
     # stashing this on the dispatcher is a hack, but at least we're doing
     # it on code from the same package.
     handler = logging.StreamHandler(log_file)
-    logger.setLevel(log_level)
+    handler.level = log_level
     logger.addHandler(handler)
     return handler

Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/utils.py	2008-09-15 02:40:49 UTC (rev 91156)
@@ -11,6 +11,7 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
+import bisect
 import datetime
 import logging
 import sys
@@ -352,3 +353,26 @@
         return '%s.%s' % (obj.__module__, obj.__name__)
     else:
         return repr(obj)
+
+def sortedmerge(sources, key=None):
+    if key is None:
+        key = lambda item: item
+    sorted_sources = []
+    for src in sources:
+        iterator = iter(src)
+        try:
+            first = iterator.next()
+        except StopIteration:
+            pass
+        else:
+            sorted_sources.append((key(first), first, iterator))
+    sorted_sources.sort()
+    while sorted_sources:
+        ignore, result, iterator = sorted_sources.pop(0)
+        yield result
+        try:
+            next = iterator.next()
+        except StopIteration:
+            pass
+        else:
+            bisect.insort(sorted_sources, (key(next), next, iterator))

Modified: zc.async/trunk/src/zc/async/z3tests.py
===================================================================
--- zc.async/trunk/src/zc/async/z3tests.py	2008-09-15 01:55:13 UTC (rev 91155)
+++ zc.async/trunk/src/zc/async/z3tests.py	2008-09-15 02:40:49 UTC (rev 91156)
@@ -20,16 +20,8 @@
 # z3monitor server or the zc.ngi.async thread. :-(
 
 import zc.async.tests
+import zc.async.monitortests
 
-def setUp(test):
-    zc.async.tests.modSetUp(test)
-    # make the uuid stable for these tests
-    f = open(os.environ["ZC_ASYNC_UUID"], 'w')
-    # make this stable for test purposes
-    f.writelines(('d10f43dc-ffdf-11dc-abd4-0017f2c49bdd',))
-    f.close()
-    zc.async.instanceuuid.UUID = zc.async.instanceuuid.getUUID()
-
 def tearDown(test):
     import zc.async.dispatcher
     zc.async.dispatcher.pop()
@@ -38,10 +30,9 @@
 def test_suite():
     return unittest.TestSuite((
         doctest.DocFileSuite(
-            'monitor.txt',
-            'monitordb.txt',
             'z3.txt',
-            setUp=setUp, tearDown=zc.async.tests.modTearDown,
+            setUp=zc.async.monitortests.setUp,
+            tearDown=zc.async.tests.modTearDown,
             optionflags=doctest.INTERPRET_FOOTNOTES),
         doctest.DocFileSuite(
             'README_3a.txt',



More information about the Checkins mailing list