[Checkins] SVN: zc.async/trunk/src/zc/async/ Write test for basic example of ReassignedError. Add helpers, some clean-up.

Gary Poster gary at modernsongs.com
Thu Sep 4 21:14:24 EDT 2008


Log message for revision 90856:
  Write test for basic example of ReassignedError.  Add helpers, some clean-up.
  
  The following new features came from working on the test.
  
  - The dispatcher's ``getActiveJobs`` method now actually tells you information
    about what's going on in the threads at this instant, rather than what's
    going on in the database.  The poll's ``active jobs`` keys continues to
    report what was true *in the database* as of *the last poll*.  This change
    also affects the ``async jobs`` monitor command.
  
  - The dispatcher method ``getJobInfo`` (and the monitor command ``async job``)
    now returns the name of the queue for the job, the name of the agent for the
    job, and whether the job has been, or was reassigned.
  
  Also tweaked monitor descriptions.
  

Changed:
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt
  U   zc.async/trunk/src/zc/async/README_1.txt
  U   zc.async/trunk/src/zc/async/catastrophes.txt
  U   zc.async/trunk/src/zc/async/dispatcher.py
  U   zc.async/trunk/src/zc/async/dispatcher.txt
  U   zc.async/trunk/src/zc/async/ftesting.py
  U   zc.async/trunk/src/zc/async/job.py
  U   zc.async/trunk/src/zc/async/monitor.py
  U   zc.async/trunk/src/zc/async/monitor.txt
  U   zc.async/trunk/src/zc/async/testing.py
  U   zc.async/trunk/src/zc/async/tests.py

-=-
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-09-05 01:14:23 UTC (rev 90856)
@@ -51,12 +51,25 @@
   double-check that the job is registered as being performed by itself.
   If not, the process should abort the transaction, make an error
   log, and give up on the job.  Write conflict errors on the job should
-  protect us from the edge cases in this story.  XXX test
+  protect us from the edge cases in this story.
 
 - Make ftesting try to join worker threads, in addition to polling thread,
   to try to eliminate intermittent test-runner warnings in ftests that a
   thread is left behind.  XXX test
 
+- The dispatcher's ``getActiveJobs`` method now actually tells you information
+  about what's going on in the threads at this instant, rather than what's
+  going on in the database.  The poll's ``active jobs`` keys continues to
+  report what was true *in the database* as of *the last poll*.  This change
+  also affects the ``async jobs`` monitor command.
+
+- The dispatcher method ``getJobInfo`` (and the monitor command ``async job``)
+  now returns the name of the queue for the job, the name of the agent for the
+  job, and whether the job has been, or was reassigned.
+
+- zc.async events inherit from 'zc.component.interfaces.IObjectEvent' instead
+  of a zc.async specific IObjectEvent (thanks to Satchit Haridas).
+
 1.4.1 (2008-07-30)
 ==================
 

Modified: zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt
===================================================================
--- zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/QUICKSTART_2_GROK.txt	2008-09-05 01:14:23 UTC (rev 90856)
@@ -60,9 +60,10 @@
    Unfortunately, building a clean, standalone workable Python 2.4.5 on OS X is
    not obvious.  This is what I recommend, if you are working on that platform.
    
-   First you need macports.  Go to macports.org and download the newest
-   version.  It doesn't seem to set up the manual path correctly, so after the
-   installation add this to your ~/.profile (or in a similar place)::
+   First you need macports.  Go to http://www.macports.org/ and download the
+   newest version.  It doesn't seem to set up the manual path correctly, so
+   after the installation add this to your ``~/.profile`` (or in a similar
+   place)::
    
     export MANPATH=/opt/local/man:$MANPATH
    
@@ -71,9 +72,9 @@
    you are working in and open a new one.
 
    Download a source distribution of Python 2.4.5.  You may have your own
-   approach as to where to put things, but I'll go with this pattern in this
-   document: ~/src will hold expanded source trees, ~/opt will hold our local
-   Python, and we'll develop in ~/dev.
+   approach as to where to put things, but I go with this pattern: ``~/src``
+   holds expanded source trees, ``~/opt`` holds our local Python, and I develop
+   in ``~/dev``.
    
    We will want readline and need zlib from macports.
 

Modified: zc.async/trunk/src/zc/async/README_1.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_1.txt	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/README_1.txt	2008-09-05 01:14:23 UTC (rev 90856)
@@ -965,10 +965,12 @@
     1
     >>> info = dispatcher.getJobInfo(*job_ids[0])
     >>> pprint.pprint(info) # doctest: +ELLIPSIS
-    {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``zc.async.doctest_test.annotateStatus()``>",
+    {'agent': 'main',
+     'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``zc.async.doctest_test.annotateStatus()``>",
      'completed': None,
      'failed': False,
      'poll id': ...,
+     'queue': '',
      'quota names': (),
      'reassigned': False,
      'result': None,
@@ -1007,15 +1009,18 @@
     []
     >>> info = dispatcher.getJobInfo(*job_ids[0])
     >>> pprint.pprint(info) # doctest: +ELLIPSIS
-    {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``zc.async.doctest_test.annotateStatus()``>",
+    {'agent': 'main',
+     'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``zc.async.doctest_test.annotateStatus()``>",
      'completed': datetime.datetime(...),
      'failed': False,
      'poll id': ...,
+     'queue': '',
      'quota names': (),
      'reassigned': False,
      'result': '42',
      'started': datetime.datetime(...),
      'thread': ...}
+
      >>> info['thread'] is not None
      True
      >>> info['poll id'] is not None

Modified: zc.async/trunk/src/zc/async/catastrophes.txt
===================================================================
--- zc.async/trunk/src/zc/async/catastrophes.txt	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/catastrophes.txt	2008-09-05 01:14:23 UTC (rev 90856)
@@ -435,12 +435,9 @@
     >>> agent.chooser = zc.async.agent.chooseFirst
     >>> transaction.commit()
     >>> lock1.release()
-    >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
-    {'': {'main': {'active jobs': [],
-                   'error': None,
-                   'len': 0,
-                   'new jobs': [(..., 'unnamed')],
-                   'size': 3}}}
+    >>> info = zc.async.testing.get_poll(dispatcher)['']['main']
+    >>> len(info['active jobs'] + info['new jobs'])
+    1
     >>> transaction.TransactionManager.commit = old_commit
     >>> zc.async.testing.wait_for_result(job)
     42
@@ -544,7 +541,7 @@
     >>> old_dispatcher = dispatcher
     >>> zc.async.dispatcher.clear()
     >>> zc.async.subscribers.ThreadedDispatcherInstaller(
-    ...         poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
+    ...         poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db))
     >>> dispatcher = zc.async.dispatcher.get()
     >>> zc.async.testing.wait_for_result(interrupt_job)
 
@@ -641,7 +638,7 @@
     >>> old_dispatcher = dispatcher
     >>> zc.async.dispatcher.clear()
     >>> zc.async.subscribers.ThreadedDispatcherInstaller(
-    ...         poll_interval=0.5)(zc.async.interfaces.DatabaseOpened(db))
+    ...         poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db))
     >>> dispatcher = zc.async.dispatcher.get()
 
 Initially, it's going to be a bit confused, because it sees that the
@@ -691,14 +688,19 @@
 instance of the dispatcher.
 
     >>> poll = zc.async.testing.get_poll(dispatcher)
+    >>> t = transaction.begin()
+    >>> da.ping_death_interval = datetime.timedelta(seconds=60)
+    >>> transaction.commit()
+    >>> from zc.async.testing import time_sleep
     >>> def wait_for_pending(job):
-    ...     for i in range(60):
+    ...     for i in range(600):
     ...         t = transaction.begin()
     ...         if job.status in (zc.async.interfaces.PENDING):
     ...             break
-    ...         time_sleep(0.1)
+    ...         time_sleep(0.01)
     ...     else:
-    ...         assert False, 'job never pending'
+    ...         assert False, 'job never pending: ' + str(job.status)
+    ...
     >>> wait_for_pending(job)
     >>> job in da['main']
     False
@@ -723,8 +725,10 @@
 
 The dispatcher cleaned up its own "hard" crash.
 
-[#cleanup2]_
+[#cleanup1]_
 
+.. _hard-crash-with-sibling-recovery:
+
 Hard Crash During Job with Sibling Recovery
 -------------------------------------------
 
@@ -757,14 +761,6 @@
     ...     zc.async.interfaces.DatabaseOpened(db))
     >>> alt_dispatcher = zc.async.dispatcher.get(alt_uuid)
 
-We're also going to set the main dispatcher's ``ping_death_interval`` back to
-60 seconds so we can see some polls in the alternate dispatcher before it gets
-around to cleaning up.
-
-    >>> da.ping_death_interval = datetime.timedelta(seconds=60)
-    >>> transaction.commit()
-    >>> 
-
 Now we'll "crash" the dispatcher.
 
     >>> dispatcher.activated = False # this will make polling stop, without
@@ -806,14 +802,14 @@
 speed up the realization of our second dispatcher that the first one is dead,
 we'll set the ping_death_interval back down to just one second.
 
+    >>> bool(da.activated)
+    True
     >>> da.ping_death_interval
     datetime.timedelta(0, 60)
     >>> import datetime
     >>> da.ping_death_interval = datetime.timedelta(seconds=1)
     >>> transaction.commit()
     >>> zc.async.testing.wait_for_death(da)
-    >>> bool(da.activated)
-    True
 
 After the second dispatcher gets a poll--a chance to notice--it will have
 cleaned up the first dispatcher's old tasks in the same way we saw in the
@@ -844,7 +840,7 @@
 The sibling, then, was able to clean up the mess left by the "hard" crash of
 the first dispatcher.
 
-[#cleanup3]_
+[#cleanup2]_
 
 Other Job-Related Errors
 ------------------------
@@ -871,7 +867,7 @@
     >>> zope.component.provideHandler(zc.async.subscribers.queue_installer)
     >>> zope.component.provideHandler(
     ...     zc.async.subscribers.ThreadedDispatcherInstaller(
-    ...         poll_interval=0.5))
+    ...         poll_interval=0.1))
     >>> zope.component.provideHandler(zc.async.subscribers.agent_installer)
     >>> import zope.event
     >>> import zc.async.interfaces
@@ -882,33 +878,11 @@
 .. [#cleanup1]
 
     >>> lock.release()
-    >>> old_dispatcher.thread.join(3)
-    >>> old_dispatcher.dead_pools[0].threads[0].join(3)
+    >>> zc.async.testing.shut_down_and_wait(old_dispatcher)
 
 .. [#cleanup2]
 
     >>> lock.release()
-    >>> old_dispatcher.thread.join(3)
-    >>> for queue_pools in old_dispatcher.queues.values():
-    ...     for name, pool in queue_pools.items():
-    ...         pool.setSize(0)
-    ...         for thread in pool.threads:
-    ...             thread.join(3)
-    ...
-    -3
-
-.. [#cleanup3]
-
-    >>> lock.release()
-    >>> dispatcher.thread.join(3)
-    >>> for queue_pools in dispatcher.queues.values():
-    ...     for name, pool in queue_pools.items():
-    ...         pool.setSize(0)
-    ...         for thread in pool.threads:
-    ...             thread.join(3)
-    ...
-    -3
-    >>> alt_dispatcher.reactor.callFromThread(alt_dispatcher.reactor.stop)
-    >>> alt_dispatcher.thread.join(3)
-    >>> alt_dispatcher.dead_pools[0].threads[0].join(3)
+    >>> zc.async.testing.shut_down_and_wait(dispatcher)
+    >>> zc.async.testing.shut_down_and_wait(alt_dispatcher)
     >>> time.sleep = old_sleep

Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/dispatcher.py	2008-09-05 01:14:23 UTC (rev 90856)
@@ -48,18 +48,22 @@
     initial_backoff = 5
     incremental_backoff = 5
     maximum_backoff = 60
+    jobid = None
 
     def __init__(self, dispatcher, name, size):
         self.dispatcher = dispatcher
         self.name = name
         self.queue = Queue.Queue(0)
         self.threads = []
+        self.jobids = {}
         self.setSize(size)
 
     def getSize(self):
         return self._size
 
     def perform_thread(self):
+        thread_id = thread.get_ident()
+        self.jobids[thread_id] = None
         zc.async.local.dispatcher = self.dispatcher
         zc.async.local.name = self.name # this is the name of this pool's agent
         conn = self.dispatcher.db.open()
@@ -67,7 +71,8 @@
             job_info = self.queue.get()
             while job_info is not None:
                 identifier, dbname, info = job_info
-                info['thread'] = thread.get_ident()
+                self.jobids[thread_id] = (ZODB.utils.u64(identifier), dbname)
+                info['thread'] = thread_id
                 info['started'] = datetime.datetime.utcnow()
                 zc.async.utils.tracelog.info(
                     'starting in thread %d: %s',
@@ -175,6 +180,7 @@
                 zc.async.utils.tracelog.info(
                     'completed in thread %d: %s',
                     info['thread'], info['call'])
+                self.jobids[thread_id] = None
                 job_info = self.queue.get()
         finally:
             conn.close()
@@ -182,6 +188,7 @@
                 # this may cause some bouncing, but we don't ever want to end
                 # up with fewer than needed.
                 self.dispatcher.reactor.callFromThread(self.setSize)
+            del self.jobids[thread_id]
 
     def setSize(self, size=None):
         # this should only be called from the thread in which the reactor runs
@@ -381,6 +388,8 @@
                         else:
                             info = {'result': None,
                                     'failed': False,
+                                    'agent': name,
+                                    'queue': queue.name,
                                     'poll id': None,
                                     'quota names': job.quota_names,
                                     'call': repr(job),
@@ -558,28 +567,25 @@
                 raise ValueError('ambiguous database name')
             else:
                 database_name = minKey[1]
-        return self.jobs[(oid, database_name)]
+        res = self.jobs[(oid, database_name)]
+        if res['completed'] is None:
+            jobid = (oid, database_name)
+            info = self.polls.first()[res['queue']][res['agent']]
+            if (jobid not in info['active jobs'] and
+                jobid not in info['new jobs']):
+                res = res.copy()
+                res['reassigned'] = True
+        return res
 
     def getActiveJobIds(self, queue=None, agent=None):
         """returns active jobs from newest to oldest"""
         res = []
-        try:
-            poll = self.polls.first()
-        except ValueError:
-            pass
-        else:
-            old = []
-            unknown = []
-            for info in _iter_info(poll, queue, agent):
-                for jobs in (info['new jobs'], info['active jobs']):
-                    for job_id in jobs:
-                        job_info = self.jobs.get(job_id)
-                        if job_info is None:
-                            unknown.append(job_id)
-                        elif not job_info['completed']:
-                            bisect.insort(old, (job_info['poll id'], job_id))
-            res.extend(i[1] for i in old)
-            res.extend(unknown)
+        for queue_name, agents in self.queues.items():
+            if queue is None or queue_name == queue:
+                for agent_name, pool in agents.items():
+                    if agent is None or agent_name == agent:
+                        res.extend(val for val in pool.jobids.values()
+                                   if val is not None)
         return res
 
     def getPollInfo(self, at=None, before=None):

Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/dispatcher.txt	2008-09-05 01:14:23 UTC (rev 90856)
@@ -290,10 +290,12 @@
     >>> info = dispatcher.getJobInfo(*poll['']['main']['new jobs'][0])
     >>> pprint.pprint(info)
     ... # doctest: +ELLIPSIS
-    {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``<built-in function mul>(14, 3)``>",
+    {'agent': 'main',
+     'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``<built-in function mul>(14, 3)``>",
      'completed': datetime.datetime(...),
      'failed': False,
      'poll id': ...,
+     'queue': '',
      'quota names': (),
      'reassigned': False,
      'result': '42',

Modified: zc.async/trunk/src/zc/async/ftesting.py
===================================================================
--- zc.async/trunk/src/zc/async/ftesting.py	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/ftesting.py	2008-09-05 01:14:23 UTC (rev 90856)
@@ -47,11 +47,5 @@
         logger = logging.getLogger('zc.async')
         logger.removeHandler(dispatcher._debug_handler)
         del dispatcher._debug_handler
-    dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
-    dispatcher.thread.join(3)
-    for queue_pools in dispatcher.queues.values():
-        for name, pool in queue_pools.items():
-            pool.setSize(0)
-            for thread in pool.threads:
-                thread.join(3)
+    zc.async.testing.shut_down_and_wait(dispatcher)
     zc.async.dispatcher.clear()

Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/job.py	2008-09-05 01:14:23 UTC (rev 90856)
@@ -819,7 +819,7 @@
             # It's debatable whether this is CRITICAL or ERROR level.  We'll
             # go with ERROR for now.
             zc.async.utils.log.error(
-                'Job %r was reassigned.  Likely cause was that polling was '
+                '%r was reassigned.  Likely cause was that polling was '
                 'unable to occur as regularly as expected, perhaps because of '
                 'long commit times in the application.', self)
             raise zc.async.interfaces.ReassignedError()

Modified: zc.async/trunk/src/zc/async/monitor.py
===================================================================
--- zc.async/trunk/src/zc/async/monitor.py	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/monitor.py	2008-09-05 01:14:23 UTC (rev 90856)
@@ -47,20 +47,21 @@
             return str(obj)
         return simplejson.JSONEncoder.default(self, obj)
 
-encoder = Encoder(sort_keys=True, indent=4) 
+encoder = Encoder(sort_keys=True, indent=4)
 
 
 def status(uuid=None):
-    """Get general zc.async dispatcher information.
-    
-    'status' is one of 'STUCK', 'STARTING', 'RUNNING', or 'STOPPED'."""
+    """Get a mapping of general zc.async dispatcher information.
+
+    'status' is one of 'STUCK', 'STARTING', 'RUNNING', or 'STOPPED', where
+    'STUCK' means the poll is past due."""
     if uuid is not None:
         uuid = uuid.UUID(uuid)
     return encoder.encode(zc.async.dispatcher.get(uuid).getStatusInfo())
 
 def jobs(queue=None, agent=None, uuid=None):
-    """Show active jobs as of last poll, sorted from newest to oldest.
-    
+    """Show active jobs in worker threads as of the instant.
+
     Usage:
 
         jobs
@@ -68,7 +69,7 @@
 
         jobs queue:<queue name>
         (jobs are filtered to those coming from the named queue)
-        
+
         jobs agent:<agent name>
         (jobs are filtered to those coming from agents with given name)
 
@@ -86,7 +87,22 @@
 def job(OID, database=None, uuid=None):
     """Local information about a job as of last poll, if known.
 
-    Does not consult ZODB, but in-memory information."""
+    Does not consult ZODB, but in-memory information.
+
+    Usage:
+
+        job <job id>
+        (returns information about the job)
+
+        job <job id> database:<database name>
+        (returns job information, with job id disambiguated by database name)
+
+    The job id in this case is an integer such as those returned by the
+    ``async jobs`` command or in the ``longest ...`` and ``shortest ...``
+    values of the ``async jobstats`` command.  It is the integer version of the
+    oid of the job, and can be converted to an oid with ``ZODB.utils.p64``, and
+    converted back to an integer with ``ZODB.utils.u64``.
+    """
     if uuid is not None:
         uuid = uuid.UUID(uuid)
     return encoder.encode(
@@ -109,12 +125,12 @@
                 minutes=vals.get('M', 0),
                 seconds=vals.get('S', 0)) + datetime.datetime.utcnow()
     return res
-                
 
+
 def jobstats(at=None, before=None, since=None, queue=None, agent=None,
              uuid=None):
     """Statistics on historical jobs as of last poll.
-    
+
     Usage:
 
         jobstats
@@ -140,10 +156,10 @@
 
     Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
     be replaced with a positive integer, and "D," "H," "M," and "S" are
-    literals standing for "days," "hours," "minutes," and "seconds." 
+    literals standing for "days," "hours," "minutes," and "seconds."
     For instance, you might use ``5M`` for five minutes, ``20S`` for
     twenty seconds, or ``1H30M`` for an hour and a half.
-    
+
     Poll keys are the values shown as "key" from the ``poll`` or ``polls``
     command.
 
@@ -160,26 +176,26 @@
 
 def poll(at=None, before=None, uuid=None):
     """Get information about a single poll, defaulting to most recent.
-    
+
     Usage:
-    
+
         poll
         (returns most recent poll)
-        
+
         poll at:<poll key or interval>
         (returns poll at or before the poll key or interval)
-        
+
         poll before:<poll key or interval>
         (returns poll before the poll key or interval)
 
     Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
     be replaced with a positive integer, and "D," "H," "M," and "S" are
-    literals standing for "days," "hours," "minutes," and "seconds." 
+    literals standing for "days," "hours," "minutes," and "seconds."
     For instance, you might use ``5M`` for five minutes, ``20S`` for
     twenty seconds, or ``1H30M`` for an hour and a half.
-    
+
     Example:
-    
+
         async poll at:5M
         (get the poll information at five minutes ago or before)"""
     # TODO: parse at and before to datetimes
@@ -192,21 +208,21 @@
 
 def polls(at=None, before=None, since=None, count=None, uuid=None):
     """Get information about recent polls, defaulting to most recent.
-    
+
     Usage:
-    
+
         polls
         (returns most recent 3 poll)
-        
+
         polls at:<poll key or interval>
         (returns up to 3 polls at or before the poll key or interval)
-        
+
         polls before:<poll key or interval>
         (returns up to 3 polls before the poll key or interval)
-        
+
         polls since:<poll key or interval>
         (returns polls since the poll key or interval, inclusive)
-        
+
         polls count <positive integer>
         (returns the given number of the most recent files)
 
@@ -215,12 +231,12 @@
 
     Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
     be replaced with a positive integer, and "D," "H," "M," and "S" are
-    literals standing for "days," "hours," "minutes," and "seconds." 
+    literals standing for "days," "hours," "minutes," and "seconds."
     For instance, you might use ``5M`` for five minutes, ``20S`` for
     twenty seconds, or ``1H30M`` for an hour and a half.
-    
+
     Example:
-    
+
         async polls before:5M since:10M
         (get the poll information from 5 to 10 minutes ago)"""
     if uuid is not None:
@@ -252,7 +268,7 @@
 
 def help(cmd=None):
     """Get help on an async monitor tool.
-    
+
     Usage is 'async help <tool name>' or 'async help'."""
     if cmd is None:
         res = [
@@ -271,14 +287,9 @@
 for f in status, jobs, job, jobstats, poll, polls, utcnow, UUID, help:
     funcs[f.__name__] = f
 
-def async(connection, cmd=None, *raw):
-    """A collection of tools to monitor zc.async activity in this process.
-    
-    To see a list of async tools, use 'async help'.
-    
-    To learn more about an async monitor tool, use 'async help <tool name>'."""
+def monitor(funcs, help, connection, cmd, raw):
     if cmd is None:
-        res = async.__doc__
+        res = help
     else:
         f = funcs.get(cmd)
         if f is None:
@@ -300,4 +311,21 @@
     connection.write(res)
     connection.write('\n')
 
-    
+def async(connection, cmd=None, *raw):
+    """A collection of tools to monitor zc.async activity in this process.
+
+    To see a list of async tools, use 'async help'.
+
+    To learn more about an async monitor tool, use 'async help <tool name>'."""
+    monitor(funcs, async.__doc__, connection, cmd, raw)
+
+def asyncdb(connection, cmd=None, *raw):
+    """A collection of tools to monitor zc.async activity in the database.
+
+    To see a list of asyncdb tools, use 'asyncdb help'.
+
+    To learn more about an asyncdb monitor tool, use 'asyncdb help <tool name>'.
+
+    ``asyncdb`` tools differ from ``async`` tools in that ``asyncdb`` tools
+    access the database, and ``async`` tools do not."""
+    monitor(dbfuncs, asyncdb.__doc__, connection, cmd, raw)

Modified: zc.async/trunk/src/zc/async/monitor.txt
===================================================================
--- zc.async/trunk/src/zc/async/monitor.txt	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/monitor.txt	2008-09-05 01:14:23 UTC (rev 90856)
@@ -29,11 +29,11 @@
     UUID: Get instance UUID in hex.
     help: Get help on an async monitor tool.
     job: Local information about a job as of last poll, if known.
-    jobs: Show active jobs as of last poll, sorted from newest to oldest.
+    jobs: Show active jobs in worker threads as of the instant.
     jobstats: Statistics on historical jobs as of last poll.
     poll: Get information about a single poll, defaulting to most recent.
     polls: Get information about recent polls, defaulting to most recent.
-    status: Get general zc.async dispatcher information.
+    status: Get a mapping of general zc.async dispatcher information.
     utcnow: Return the current time in UTC, in ISO 8601 format. 
     -> CLOSE
 
@@ -72,9 +72,10 @@
   with all as ints except seconds, which is a float.
 
     >>> connection.test_input('async help status\n')
-    Get general zc.async dispatcher information.
+    Get a mapping of general zc.async dispatcher information.
     <BLANKLINE>
-        'status' is one of 'STUCK', 'STARTING', 'RUNNING', or 'STOPPED'. 
+        'status' is one of 'STUCK', 'STARTING', 'RUNNING', or 'STOPPED', where
+        'STUCK' means the poll is past due. 
     -> CLOSE
 
     >>> connection.test_input('async status\n')
@@ -100,7 +101,7 @@
 - several commands have the "queue:" and "agent:" modifiers.
 
     >>> connection.test_input('async help jobs\n')
-    Show active jobs as of last poll, sorted from newest to oldest.
+    Show active jobs in worker threads as of the instant.
     <BLANKLINE>
         Usage:
     <BLANKLINE>
@@ -163,7 +164,7 @@
     <BLANKLINE>
         Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
         be replaced with a positive integer, and "D," "H," "M," and "S" are
-        literals standing for "days," "hours," "minutes," and "seconds." 
+        literals standing for "days," "hours," "minutes," and "seconds."
         For instance, you might use ``5M`` for five minutes, ``20S`` for
         twenty seconds, or ``1H30M`` for an hour and a half.
     <BLANKLINE>
@@ -200,7 +201,7 @@
     Get information about a single poll, defaulting to most recent.
     <BLANKLINE>
         Usage:
-    <BLANKLINE>    
+    <BLANKLINE>
             poll
             (returns most recent poll)
     <BLANKLINE>
@@ -212,7 +213,7 @@
     <BLANKLINE>
         Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
         be replaced with a positive integer, and "D," "H," "M," and "S" are
-        literals standing for "days," "hours," "minutes," and "seconds." 
+        literals standing for "days," "hours," "minutes," and "seconds."
         For instance, you might use ``5M`` for five minutes, ``20S`` for
         twenty seconds, or ``1H30M`` for an hour and a half.
     <BLANKLINE>
@@ -259,7 +260,7 @@
     <BLANKLINE>
         Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
         be replaced with a positive integer, and "D," "H," "M," and "S" are
-        literals standing for "days," "hours," "minutes," and "seconds." 
+        literals standing for "days," "hours," "minutes," and "seconds."
         For instance, you might use ``5M`` for five minutes, ``20S`` for
         twenty seconds, or ``1H30M`` for an hour and a half.
     <BLANKLINE>
@@ -335,11 +336,14 @@
 
     >>> connection.test_input('async job 30\n') # doctest: +ELLIPSIS
     {
+        "agent": "main", 
         "call": "<zc.async.job.Job (oid 30, db 'unnamed') ``zc.async.doctest_test.send_message()``>", 
         "completed": "2006-08-10T15:44:...Z", 
         "failed": false, 
         "poll id": 6420106068024891087, 
+        "queue": "", 
         "quota names": [], 
+        "reassigned": false, 
         "result": "None", 
         "started": "2006-08-10T15:44:...Z", 
         "thread": ...
@@ -405,11 +409,14 @@
 
     >>> connection.test_input('async job 36\n') # doctest: +ELLIPSIS
     {
+        "agent": "main", 
         "call": "<zc.async.job.Job (oid 36, db 'unnamed') ``zc.async.doctest_test.wait_for_me()``>", 
         "completed": null, 
         "failed": false, 
         "poll id": 6420106067941005007, 
+        "queue": "", 
         "quota names": [], 
+        "reassigned": false, 
         "result": null, 
         "started": "2006-08-10T15:44:...Z", 
         "thread": ...
@@ -623,11 +630,14 @@
 
     >>> connection.test_input('async job 36\n') # doctest: +ELLIPSIS
     {
+        "agent": "main", 
         "call": "<zc.async.job.Job (oid 36, db 'unnamed') ``zc.async.doctest_test.wait_for_me()``>", 
         "completed": "2006-08-10T15:44:...Z", 
         "failed": false, 
         "poll id": 6420106067941005007, 
+        "queue": "", 
         "quota names": [], 
+        "reassigned": false, 
         "result": "None", 
         "started": "2006-08-10T15:44:...Z", 
         "thread": ...

Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/testing.py	2008-09-05 01:14:23 UTC (rev 90856)
@@ -253,6 +253,22 @@
     else:
         assert False, 'annotation never found'
 
+def shut_down_and_wait(dispatcher):
+    threads = []
+    for queue_pools in dispatcher.queues.values():
+        for pool in queue_pools.values():
+            threads.extend(pool.threads)
+    dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
+    dispatcher.thread.join(3)
+    # in most cases, this is unnecessary, but in some instances, such as in
+    # some examples in catastrophes.txt, this is needed.
+    for queue_pools in dispatcher.queues.values():
+        for pool in queue_pools.values():
+           pool.setSize(0)
+    # this makes sure that all the worker threads have a chance to stop.
+    for thread in threads:
+        thread.join(3)
+
 def print_logs(log_file=sys.stdout, log_level=logging.CRITICAL):
     # really more of a debugging tool
     logger = logging.getLogger('zc.async')

Modified: zc.async/trunk/src/zc/async/tests.py
===================================================================
--- zc.async/trunk/src/zc/async/tests.py	2008-09-05 00:01:26 UTC (rev 90855)
+++ zc.async/trunk/src/zc/async/tests.py	2008-09-05 01:14:23 UTC (rev 90856)
@@ -148,6 +148,7 @@
             'README_1.txt',
             'README_2.txt',
             'catastrophes.txt',
+            'catastrophes_revisited.txt',
             'ftesting.txt',
             'QUICKSTART_1_VIRTUALENV.txt',
             setUp=modSetUp, tearDown=modTearDown,



More information about the Checkins mailing list