[Checkins] SVN: zc.async/trunk/src/zc/async/ Tests and small addition for jobs.
Gary Poster
gary at modernsongs.com
Thu Sep 18 21:15:06 EDT 2008
Log message for revision 91246:
Tests and small addition for jobs.
Added "count" argument to jobs command. Added two of three ordering tests for
jobs.
Changed:
U zc.async/trunk/src/zc/async/monitordb.py
U zc.async/trunk/src/zc/async/monitordb.txt
U zc.async/trunk/src/zc/async/utils.py
-=-
Modified: zc.async/trunk/src/zc/async/monitordb.py
===================================================================
--- zc.async/trunk/src/zc/async/monitordb.py 2008-09-18 22:57:10 UTC (rev 91245)
+++ zc.async/trunk/src/zc/async/monitordb.py 2008-09-19 01:15:04 UTC (rev 91246)
@@ -124,8 +124,10 @@
break
else:
yield j
+
+ now = datetime.datetime.now(pytz.UTC)
def agent_key(job):
- return (job.active_start or job.begin_after).isoformat()
+ return (job.active_start or now).isoformat()
agent_sources = []
sources.append((agent_sources, agent_key))
if completed:
@@ -309,6 +311,10 @@
reprs of the jobs are used instead. If the display value is "detail,"
a dictionary of details is used for each job.
+ - "count": By default, or with a value of 0, this will include all jobs
+ matching the filter. If you provide a count (a positive integer), only
+ a maximum of the given "count" items will be listed.
+
Usage Examples
==============
@@ -330,6 +336,10 @@
last hour that called a function or method that began with the string
"import_")
+ asyncdb job pending count:3 display:repr
+ (lists the job reprs for the three pending jobs next in line to be
+ performed)
+
Here are some examples of how the duration-based filters work.
* If you used "start:since5s" then that could be read as "jobs that
@@ -348,7 +358,10 @@
"since" do not matter.)
"""
display = kwargs.pop('display', 'default').lower()
+ count = int(kwargs.pop('count', 0))
res = _jobs(context, states, **kwargs)
+ if count:
+ res = zc.async.utils.takecount(res, count)
if display == 'default':
return res
elif display == 'repr':
Modified: zc.async/trunk/src/zc/async/monitordb.txt
===================================================================
--- zc.async/trunk/src/zc/async/monitordb.txt 2008-09-18 22:57:10 UTC (rev 91245)
+++ zc.async/trunk/src/zc/async/monitordb.txt 2008-09-19 01:15:04 UTC (rev 91246)
@@ -212,8 +212,11 @@
For the full details example, we only show the first job, in the interest of
space.
- >>> connection.test_input('asyncdb jobs pending display:details\n')
- ... # doctest: +ELLIPSIS
+(Because we limit this to a "count" of 1, this is essentially equivalent to
+``nextpending``, except it takes a lot longer to write and it has a matched
+pair of square brackets on the outside.)
+
+ >>> connection.test_input('asyncdb jobs pending display:details count:1\n')
[
{
"active": null,
@@ -235,7 +238,7 @@
"wait": {
"seconds": 0.0
}
- }, ...
+ }
]
-> CLOSE
@@ -822,7 +825,7 @@
>>> ignore = reactor.time_flies(dispatcher.poll_interval)
>>> active_lock2.release()
- >>> time.sleep(0.2)
+ >>> time.sleep(0.4)
Now, how many jobs have completed?
@@ -1135,8 +1138,6 @@
Initially, before another poll, the dispatchers will not be activated in the
new queue.
-
-
>>> connection.test_input('asyncdb status\n')
{
"": {
@@ -1249,10 +1250,95 @@
0
-> CLOSE
+It's worth noting that ``jobs``, ``lastcompleted`` and ``nextpending`` all rely
+on merging pending tasks across queues, and merging in-agent and completed
+tasks across agents, while keeping ordering.
+
+The pending tasks are almost always arranged effectively in order of the
+``begin_after`` value of each job. In rare cases, when interrupted tasks are
+scheduled for a retry, this may get a bit out of order, but the order does
+reflect the exact order of the jobs the queue offers. ``begin_after`` is used
+for the merge across queues, if that is necessary.
+
+ >>> connection.test_input('asyncdb jobs pending display:repr\n')
+ [
+ "<zc.async.job.Job (oid 35, db 'unnamed') ``__builtin__.sum((18, 18, 6))``>"
+ ]
+ -> CLOSE
+
+ >>> j = alt_queue.put(zc.async.job.Job(sum_silly, 41, 41))
+ >>> transaction.commit()
+ >>> connection.test_input('asyncdb jobs pending display:repr\n')
+ [
+ "<zc.async.job.Job (oid 35, db 'unnamed') ``__builtin__.sum((18, 18, 6))``>",
+ "<zc.async.job.Job (oid 107, db 'unnamed') ``zc.async.doctest_test.sum_silly(41, 41)``>"
+ ]
+ -> CLOSE
+
+ >>> ignore = reactor.time_flies(1)
+ >>> j = queue.put(send_message)
+ >>> transaction.commit()
+ >>> connection.test_input('asyncdb jobs pending display:repr\n')
+ [
+ "<zc.async.job.Job (oid 35, db 'unnamed') ``__builtin__.sum((18, 18, 6))``>",
+ "<zc.async.job.Job (oid 107, db 'unnamed') ``zc.async.doctest_test.sum_silly(41, 41)``>",
+ "<zc.async.job.Job (oid 113, db 'unnamed') ``zc.async.doctest_test.send_message()``>"
+ ]
+ -> CLOSE
+
+The agents are generally ordered from oldest started to newest, and use
+``active_start`` or now (assuming that the task is assigned but not yet active
+if ``active_start`` is not set).
+
+ >>> connection.test_input('asyncdb jobs active callbacks display:repr\n')
+ [
+ "<zc.async.job.Job (oid 31, db 'unnamed') ``zc.async.doctest_test.active_pause()``>",
+ "<zc.async.job.Job (oid 32, db 'unnamed') ``zc.async.doctest_test.sum_silly(18, 18, start=6)``>",
+ "<zc.async.job.Job (oid 84, db 'unnamed') ``zc.async.doctest_test.active_pause2()``>"
+ ]
+ -> CLOSE
+
+ >>> active_lock.release()
+ >>> time.sleep(0.4)
+ >>> agent = zc.async.agent.Agent()
+ >>> alt_queue.dispatchers[alt_dispatcher.UUID]['main'] = agent
+ >>> j = alt_queue.put(active_pause)
+ >>> transaction.commit()
+ >>> ignore = reactor.time_flies(dispatcher.poll_interval)
+ >>> time.sleep(0.4)
+ >>> connection.test_input('asyncdb jobs active callbacks display:repr\n')
+ [
+ "<zc.async.job.Job (oid 32, db 'unnamed') ``zc.async.doctest_test.sum_silly(18, 18, start=6)``>",
+ "<zc.async.job.Job (oid 84, db 'unnamed') ``zc.async.doctest_test.active_pause2()``>",
+ "<zc.async.job.Job (oid 118, db 'unnamed') ``zc.async.doctest_test.active_pause()``>"
+ ]
+ -> CLOSE
+
>>> active_lock2.release()
+ >>> time.sleep(0.4)
+ >>> j = alt_queue.put(active_pause2)
+ >>> transaction.commit()
+ >>> ignore = reactor.time_flies(dispatcher.poll_interval)
+ >>> time.sleep(0.4)
+ >>> connection.test_input('asyncdb jobs active callbacks display:repr\n')
+ [
+ "<zc.async.job.Job (oid 32, db 'unnamed') ``zc.async.doctest_test.sum_silly(18, 18, start=6)``>",
+ "<zc.async.job.Job (oid 118, db 'unnamed') ``zc.async.doctest_test.active_pause()``>",
+ "<zc.async.job.Job (oid 135, db 'unnamed') ``zc.async.doctest_test.active_pause2()``>"
+ ]
+ -> CLOSE
+
+The completed tasks are ordered the most strictly. They are ordered from most
+recently completed to oldest completed, where "completed" is defined as the
+first time that all callbacks ran to completion. Therefore, as you might
+guess, ``lastcompleted`` just gets the first job from the merged list that
+``jobs`` uses.
+
+ XXX
+
+ >>> active_lock2.release()
>>> active_lock.release()
>>> callback_lock.release()
-
[#tearDown]_
Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py 2008-09-18 22:57:10 UTC (rev 91245)
+++ zc.async/trunk/src/zc/async/utils.py 2008-09-19 01:15:04 UTC (rev 91246)
@@ -376,3 +376,15 @@
pass
else:
bisect.insort(sorted_sources, (key(next), next, iterator))
+
+def takecount(res, count):
+ if count < 0:
+ raise ValueError('count must be a positive integer')
+ if count == 0:
+ return
+ ct = 0
+ for val in res:
+ yield val
+ ct += 1
+ if ct >= count:
+ break
More information about the Checkins
mailing list