[Checkins] SVN: zc.async/trunk/src/zc/async/ Tests and small addition for jobs.

Gary Poster gary at modernsongs.com
Thu Sep 18 21:15:06 EDT 2008


Log message for revision 91246:
  Tests and small addition for jobs.
  
  Added "count" argument to jobs command.  Added two of three ordering tests for
  jobs.
  
  

Changed:
  U   zc.async/trunk/src/zc/async/monitordb.py
  U   zc.async/trunk/src/zc/async/monitordb.txt
  U   zc.async/trunk/src/zc/async/utils.py

-=-
Modified: zc.async/trunk/src/zc/async/monitordb.py
===================================================================
--- zc.async/trunk/src/zc/async/monitordb.py	2008-09-18 22:57:10 UTC (rev 91245)
+++ zc.async/trunk/src/zc/async/monitordb.py	2008-09-19 01:15:04 UTC (rev 91246)
@@ -124,8 +124,10 @@
                         break
                 else:
                     yield j
+        
+        now = datetime.datetime.now(pytz.UTC)
         def agent_key(job):
-            return (job.active_start or job.begin_after).isoformat()
+            return (job.active_start or now).isoformat()
         agent_sources = []
         sources.append((agent_sources, agent_key))
     if completed:
@@ -309,6 +311,10 @@
       reprs of the jobs are used instead.  If the display value is "detail,"
       a dictionary of details is used for each job.
 
+    - "count": By default, or with a value of 0, this will include all jobs
+      matching the filter.  If you provide a count (a positive integer), only
+      a maximum of the given "count" items will be listed.
+
     Usage Examples
     ==============
     
@@ -330,6 +336,10 @@
         last hour that called a function or method that began with the string
         "import_")
 
+        asyncdb job pending count:3 display:repr
+        (lists the job reprs for the three pending jobs next in line to be
+        performed)
+
     Here are some examples of how the duration-based filters work.
     
     * If you used "start:since5s" then that could be read as "jobs that
@@ -348,7 +358,10 @@
       "since" do not matter.)
     """
     display = kwargs.pop('display', 'default').lower()
+    count = int(kwargs.pop('count', 0))
     res = _jobs(context, states, **kwargs)
+    if count:
+        res = zc.async.utils.takecount(res, count)
     if display == 'default':
         return res
     elif display == 'repr':

Modified: zc.async/trunk/src/zc/async/monitordb.txt
===================================================================
--- zc.async/trunk/src/zc/async/monitordb.txt	2008-09-18 22:57:10 UTC (rev 91245)
+++ zc.async/trunk/src/zc/async/monitordb.txt	2008-09-19 01:15:04 UTC (rev 91246)
@@ -212,8 +212,11 @@
 For the full details example, we only show the first job, in the interest of
 space.
 
-    >>> connection.test_input('asyncdb jobs pending display:details\n')
-    ... # doctest: +ELLIPSIS
+(Because we limit this to a "count" of 1, this is essentially equivalent to
+``nextpending``, except it takes a lot longer to write and it has a matched
+pair of square brackets on the outside.)
+
+    >>> connection.test_input('asyncdb jobs pending display:details count:1\n')
     [
         {
             "active": null, 
@@ -235,7 +238,7 @@
             "wait": {
                 "seconds": 0.0
             }
-        }, ...
+        }
     ] 
     -> CLOSE
 
@@ -822,7 +825,7 @@
 
     >>> ignore = reactor.time_flies(dispatcher.poll_interval)
     >>> active_lock2.release()
-    >>> time.sleep(0.2)
+    >>> time.sleep(0.4)
 
 Now, how many jobs have completed?
 
@@ -1135,8 +1138,6 @@
 Initially, before another poll, the dispatchers will not be activated in the
 new queue.
 
-
-
     >>> connection.test_input('asyncdb status\n')
     {
         "": {
@@ -1249,10 +1250,95 @@
     0 
     -> CLOSE
 
+It's worth noting that ``jobs``, ``lastcompleted`` and ``nextpending`` all rely
+on merging pending tasks across queues, and merging in-agent and completed
+tasks across agents, while keeping ordering.
+
+The pending tasks are almost always arranged effectively in order of the
+``begin_after`` value of each job.  In rare cases, when interrupted tasks are
+scheduled for a retry, this may get a bit out of order, but the order does
+reflect the exact order of the jobs the queue offers.  ``begin_after`` is used
+for the merge across queues, if that is necessary.
+
+    >>> connection.test_input('asyncdb jobs pending display:repr\n')
+    [
+        "<zc.async.job.Job (oid 35, db 'unnamed') ``__builtin__.sum((18, 18, 6))``>"
+    ] 
+    -> CLOSE
+
+    >>> j = alt_queue.put(zc.async.job.Job(sum_silly, 41, 41))
+    >>> transaction.commit()
+    >>> connection.test_input('asyncdb jobs pending display:repr\n')
+    [
+        "<zc.async.job.Job (oid 35, db 'unnamed') ``__builtin__.sum((18, 18, 6))``>", 
+        "<zc.async.job.Job (oid 107, db 'unnamed') ``zc.async.doctest_test.sum_silly(41, 41)``>"
+    ] 
+    -> CLOSE
+
+    >>> ignore = reactor.time_flies(1)
+    >>> j = queue.put(send_message)
+    >>> transaction.commit()
+    >>> connection.test_input('asyncdb jobs pending display:repr\n')
+    [
+        "<zc.async.job.Job (oid 35, db 'unnamed') ``__builtin__.sum((18, 18, 6))``>", 
+        "<zc.async.job.Job (oid 107, db 'unnamed') ``zc.async.doctest_test.sum_silly(41, 41)``>", 
+        "<zc.async.job.Job (oid 113, db 'unnamed') ``zc.async.doctest_test.send_message()``>"
+    ] 
+    -> CLOSE
+
+The agents are generally ordered from oldest started to newest, and use
+``active_start`` or now (assuming that the task is assigned but not yet active
+if ``active_start`` is not set).
+
+    >>> connection.test_input('asyncdb jobs active callbacks display:repr\n')
+    [
+        "<zc.async.job.Job (oid 31, db 'unnamed') ``zc.async.doctest_test.active_pause()``>", 
+        "<zc.async.job.Job (oid 32, db 'unnamed') ``zc.async.doctest_test.sum_silly(18, 18, start=6)``>", 
+        "<zc.async.job.Job (oid 84, db 'unnamed') ``zc.async.doctest_test.active_pause2()``>"
+    ] 
+    -> CLOSE
+
+    >>> active_lock.release()
+    >>> time.sleep(0.4)
+    >>> agent = zc.async.agent.Agent()
+    >>> alt_queue.dispatchers[alt_dispatcher.UUID]['main'] = agent
+    >>> j = alt_queue.put(active_pause)
+    >>> transaction.commit()
+    >>> ignore = reactor.time_flies(dispatcher.poll_interval)
+    >>> time.sleep(0.4)
+    >>> connection.test_input('asyncdb jobs active callbacks display:repr\n')
+    [
+        "<zc.async.job.Job (oid 32, db 'unnamed') ``zc.async.doctest_test.sum_silly(18, 18, start=6)``>", 
+        "<zc.async.job.Job (oid 84, db 'unnamed') ``zc.async.doctest_test.active_pause2()``>", 
+        "<zc.async.job.Job (oid 118, db 'unnamed') ``zc.async.doctest_test.active_pause()``>"
+    ] 
+    -> CLOSE
+
     >>> active_lock2.release()
+    >>> time.sleep(0.4)
+    >>> j = alt_queue.put(active_pause2)
+    >>> transaction.commit()
+    >>> ignore = reactor.time_flies(dispatcher.poll_interval)
+    >>> time.sleep(0.4)
+    >>> connection.test_input('asyncdb jobs active callbacks display:repr\n')
+    [
+        "<zc.async.job.Job (oid 32, db 'unnamed') ``zc.async.doctest_test.sum_silly(18, 18, start=6)``>", 
+        "<zc.async.job.Job (oid 118, db 'unnamed') ``zc.async.doctest_test.active_pause()``>", 
+        "<zc.async.job.Job (oid 135, db 'unnamed') ``zc.async.doctest_test.active_pause2()``>"
+    ] 
+    -> CLOSE
+
+The completed tasks are ordered the most strictly.  They are ordered from most
+recently completed to oldest completed, where "completed" is defined as the
+first time that all callbacks ran to completion.  Therefore, as you might
+guess, ``lastcompleted`` just gets the first job from the merged list that
+``jobs`` uses.
+
+    XXX
+
+    >>> active_lock2.release()
     >>> active_lock.release()
     >>> callback_lock.release()
-    
 
 [#tearDown]_
 

Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py	2008-09-18 22:57:10 UTC (rev 91245)
+++ zc.async/trunk/src/zc/async/utils.py	2008-09-19 01:15:04 UTC (rev 91246)
@@ -376,3 +376,15 @@
             pass
         else:
             bisect.insort(sorted_sources, (key(next), next, iterator))
+
+def takecount(res, count):
+    if count < 0:
+        raise ValueError('count must be a positive integer')
+    if count == 0:
+        return
+    ct = 0
+    for val in res:
+        yield val
+        ct += 1
+        if ct >= count:
+            break



More information about the Checkins mailing list