[Checkins] SVN: zc.async/branches/dev/ needs much more monitoring
tests, but this is a checkpoint.
Gary Poster
gary at zope.com
Tue Apr 1 23:11:16 EDT 2008
Log message for revision 85059:
needs much more monitoring tests, but this is a checkpoint.
Changed:
U zc.async/branches/dev/buildout.cfg
U zc.async/branches/dev/setup.py
U zc.async/branches/dev/src/zc/async/README.txt
U zc.async/branches/dev/src/zc/async/dispatcher.py
U zc.async/branches/dev/src/zc/async/dispatcher.txt
U zc.async/branches/dev/src/zc/async/instanceuuid.py
U zc.async/branches/dev/src/zc/async/job.py
U zc.async/branches/dev/src/zc/async/job.txt
A zc.async/branches/dev/src/zc/async/monitor.py
U zc.async/branches/dev/src/zc/async/monitor.txt
U zc.async/branches/dev/src/zc/async/testing.py
U zc.async/branches/dev/src/zc/async/utils.py
A zc.async/branches/dev/src/zc/async/z3tests.py
-=-
Modified: zc.async/branches/dev/buildout.cfg
===================================================================
--- zc.async/branches/dev/buildout.cfg 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/buildout.cfg 2008-04-02 03:11:14 UTC (rev 85059)
@@ -2,6 +2,8 @@
parts =
interpreter
test
+ z3interpreter
+ z3test
develop = .
@@ -13,10 +15,21 @@
[test]
recipe = zc.recipe.testrunner
eggs = zc.async
-defaults = '--tests-pattern [fn]?tests --exit-with-status -1'.split()
+defaults = '--tests-pattern ^[fn]?tests --exit-with-status -1'.split()
working-directory = ${buildout:directory}
+[z3test]
+recipe = zc.recipe.testrunner
+eggs = zc.async [z3]
+defaults = "--tests-pattern z3tests --exit-with-status -1".split()
+
+
[interpreter]
recipe = zc.recipe.egg
eggs = zc.async
interpreter = py
+
+[z3interpreter]
+recipe = zc.recipe.egg
+eggs = zc.async [z3]
+interpreter = z3py
Modified: zc.async/branches/dev/setup.py
===================================================================
--- zc.async/branches/dev/setup.py 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/setup.py 2008-04-02 03:11:14 UTC (rev 85059)
@@ -20,10 +20,8 @@
'uuid',
'zc.queue',
'zc.dict>=1.2.1',
- 'zc.twist>=1.0.1',
- 'zc.twisted', # setup-friendly Twisted distro. Someday soon we can
- # discard zc.twisted, hopefully. See
- # http://twistedmatrix.com/trac/ticket/1286
+ 'zc.twist>=1.1',
+ 'Twisted>=8.0.1', # 8.0 was setuptools compatible
'zope.bforest>=1.1',
'zope.component',
'zope.i18nmessageid',
@@ -31,5 +29,10 @@
'zope.testing',
'rwproperty',
],
+ extras_require={
+ 'z3':[
+ 'zc.z3monitor',
+ 'simplejson',
+ ]},
include_package_data=True,
)
Modified: zc.async/branches/dev/src/zc/async/README.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README.txt 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/README.txt 2008-04-02 03:11:14 UTC (rev 85059)
@@ -530,14 +530,17 @@
True
>>> job.status == zc.async.interfaces.ACTIVE
True
+
+[#stats_1]_
+
>>> job.annotations['zc.async.test.flag'] = True
>>> transaction.commit()
>>> wait_for(job)
>>> job.result
42
-``getReactor`` and ``getDispatcher`` are for advanced use cases and are not
-explored further here.
+[#stats_2]_ ``getReactor`` and ``getDispatcher`` are for advanced use
+cases and are not explored further here.
Job Quotas
==========
@@ -902,7 +905,7 @@
the original requester. The 2.x line addresses this with three
changes:
- + jobss are annotatable;
+ + jobs are annotatable;
+ jobs should not be modified in an asynchronous
worker that does work (though they may be read);
@@ -1092,6 +1095,95 @@
errors, do not request an annotation that might be a persistent
object.*
+.. [#stats_1] The dispatcher has a getStatistics method. It also shows the
+ fact that there is an active task.
+
+ >>> import pprint
+ >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
+ {'failed': 2,
+ 'longest active': ('\x00...', 'unnamed'),
+ 'longest failed': ('\x00...', 'unnamed'),
+ 'longest successful': ('\x00...', 'unnamed'),
+ 'shortest active': ('\x00\...', 'unnamed'),
+ 'shortest failed': ('\x00\...', 'unnamed'),
+ 'shortest successful': ('\x00...', 'unnamed'),
+ 'started': 12,
+ 'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
+ 'statistics start': datetime.datetime(2006, 8, 10, 15, 56, 47, 211),
+ 'successful': 9,
+ 'unknown': 0}
+
+ We can also see the active job with ``getActiveJobIds``
+
+ >>> job_ids = dispatcher.getActiveJobIds()
+ >>> len(job_ids)
+ 1
+ >>> info = dispatcher.getJobInfo(*job_ids[0])
+ >>> pprint.pprint(info) # doctest: +ELLIPSIS
+ {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``zc.async.doctest_test.annotateStatus()``>",
+ 'completed': None,
+ 'failed': False,
+ 'poll id': ...,
+ 'quota names': (),
+ 'result': None,
+ 'started': datetime.datetime(...),
+ 'thread': ...}
+ >>> info['thread'] is not None
+ True
+ >>> info['poll id'] is not None
+ True
+
+
+.. [#stats_2] Now the task is done, as the stats reflect.
+
+ >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
+ {'failed': 2,
+ 'longest active': None,
+ 'longest failed': ('\x00...', 'unnamed'),
+ 'longest successful': ('\x00...', 'unnamed'),
+ 'shortest active': None,
+ 'shortest failed': ('\x00\...', 'unnamed'),
+ 'shortest successful': ('\x00...', 'unnamed'),
+ 'started': 12,
+ 'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
+ 'statistics start': datetime.datetime(...),
+ 'successful': 10,
+ 'unknown': 0}
+
+ The ``getActiveJobIds`` list shows the new task--which is completed, but
+ not as of the last poll, so it's still in the list.
+
+ >>> job_ids = dispatcher.getActiveJobIds()
+ >>> len(job_ids)
+ 1
+ >>> info = dispatcher.getJobInfo(*job_ids[0])
+ >>> pprint.pprint(info) # doctest: +ELLIPSIS
+ {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``zc.async.doctest_test.annotateStatus()``>",
+ 'completed': datetime.datetime(...),
+ 'failed': False,
+ 'poll id': ...,
+ 'quota names': (),
+ 'result': '42',
+ 'started': datetime.datetime(...),
+ 'thread': ...}
+ >>> info['thread'] is not None
+ True
+ >>> info['poll id'] is not None
+ True
+
.. [#stop_reactor]
+ >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
+ {'failed': 2,
+ 'longest active': None,
+ 'longest failed': ('\x00...', 'unnamed'),
+ 'longest successful': ('\x00...', 'unnamed'),
+ 'shortest active': None,
+ 'shortest failed': ('\x00\...', 'unnamed'),
+ 'shortest successful': ('\x00...', 'unnamed'),
+ 'started': 24,
+ 'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
+ 'statistics start': datetime.datetime(2006, 8, 10, 15, 57, 47, 211),
+ 'successful': 22,
+ 'unknown': 0}
>>> reactor.stop()
Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py 2008-04-02 03:11:14 UTC (rev 85059)
@@ -1,5 +1,6 @@
import time
import datetime
+import bisect
import Queue
import thread
import threading
@@ -7,9 +8,11 @@
import twisted.python.failure
import twisted.internet.defer
import ZODB.POSException
+import BTrees
import transaction
import transaction.interfaces
import zope.component
+import zope.bforest.periodic
import zc.twist
import zc.async.utils
@@ -116,11 +119,15 @@
job = self.queue.get()
while job is not None:
db, identifier, info = job
+ info['thread'] = thread.get_ident()
+ info['started'] = datetime.datetime.utcnow()
+ zc.async.utils.tracelog.info(
+ 'starting in thread %d: %r',
+ info['thread'], info['call'])
conn = db.open()
try:
transaction.begin()
job = conn.get(identifier)
- info['thread'] = thread.get_ident()
local.job = job
try:
job() # this does the committing and retrying, largely
@@ -134,10 +141,20 @@
transaction.abort() # retry forever (!)
else:
break
+ # should come before 'completed' for threading dance
+ if isinstance(job.result, twisted.python.failure.Failure):
+ info['failed'] = True
+ info['completed'] = datetime.datetime.utcnow()
+ info['result'] = repr(job.result)
finally:
local.job = None
transaction.abort()
conn.close()
+ zc.async.utils.tracelog.info(
+ '%s %s in thread %d with result %s',
+ info['call'],
+ info['failed'] and 'failed' or 'succeeded',
+ info['thread'], info['result'])
job = self.queue.get()
finally:
if self.dispatcher.activated:
@@ -172,21 +189,42 @@
self.queue.put(None)
return size - old # size difference
+# this is mostly for testing
+_dispatchers = {}
+
+def get(uuid, default=None):
+ if uuid is None:
+ uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
+ return _dispatchers.get(uuid, default)
+
+def pop(uuid):
+ if uuid is None:
+ uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
+ return _dispatchers.pop(uuid)
+
+clear = _dispatchers.clear
+
class Dispatcher(object):
activated = False
+ conn = None
def __init__(self, db, reactor, poll_interval=5, uuid=None):
+ if uuid is None:
+ uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
+ if uuid in _dispatchers:
+ raise ValueError('dispatcher for this UUID is already registered')
+ _dispatchers[uuid] = self
self.db = db
self.reactor = reactor # we may allow the ``reactor`` argument to be
# None at some point, to default to the installed Twisted reactor.
self.poll_interval = poll_interval
- if uuid is None:
- uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
self.UUID = uuid
self.polls = zc.async.utils.Periodic(
- period=datetime.timedelta(days=1), buckets=4)
+ period=datetime.timedelta(hours=2), buckets=3)
+ self.jobs = zope.bforest.periodic.OOBForest(
+ period=datetime.timedelta(hours=3), count=4)
self._activated = set()
self.queues = {}
self.dead_pools = []
@@ -214,6 +252,7 @@
agent.queue.name,
agent.queue._p_oid))
if res is None:
+ # Successful commit
res = job
return res
@@ -246,10 +285,11 @@
break
def poll(self):
- conn = self.db.open()
poll_info = PollInfo()
+ started_jobs = []
+ transaction.begin() # sync and clear
try:
- queues = conn.root().get(zc.async.interfaces.KEY)
+ queues = self.conn.root().get(zc.async.interfaces.KEY)
if queues is None:
transaction.abort()
return
@@ -284,14 +324,17 @@
pools = self.queues[queue.name] = {}
for name, agent in da.items():
job_info = []
+ active_jobs = [
+ (job._p_oid,
+ getattr(job._p_jar.db(), 'database_name', None))
+ for job in agent]
agent_info = queue_info[name] = {
'size': None, 'len': None, 'error': None,
- 'new_jobs': job_info}
+ 'new jobs': job_info, 'active jobs': active_jobs}
try:
agent_info['size'] = agent.size
agent_info['len'] = len(agent)
except zc.twist.EXPLOSIVE_ERRORS:
- transaction.abort()
raise
except:
agent_info['error'] = zc.twist.sanitize(
@@ -316,7 +359,6 @@
try:
agent.failure = res
except zc.twist.EXPLOSIVE_ERRORS:
- transaction.abort()
raise
except:
transaction.abort()
@@ -326,13 +368,20 @@
# TODO improve msg
self._commit('trying to stash failure on agent')
else:
- info = {'oid': job._p_oid,
- 'callable': repr(job.callable),
- 'begin_after': job.begin_after.isoformat(),
- 'quota_names': job.quota_names,
- 'assignerUUID': job.assignerUUID,
+ info = {'result': None,
+ 'failed': False,
+ 'poll id': None,
+ 'quota names': job.quota_names,
+ 'call': repr(job),
+ 'started': None,
+ 'completed': None,
'thread': None}
- job_info.append(info)
+ started_jobs.append(info)
+ dbname = getattr(
+ job._p_jar.db(), 'database_name', None)
+ jobid = (job._p_oid, dbname)
+ self.jobs[jobid] = info
+ job_info.append(jobid)
pool.queue.put(
(job._p_jar.db(), job._p_oid, info))
job = self._getJob(agent)
@@ -366,8 +415,11 @@
self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
finally:
transaction.abort()
- conn.close()
self.polls.add(poll_info)
+ for info in started_jobs:
+ info['poll id'] = poll_info.key
+ zc.async.utils.tracelog.info(
+ 'poll %s: %r', poll_info.key, poll_info)
def directPoll(self):
if not self.activated:
@@ -395,8 +447,14 @@
def activate(self, threaded=False):
if self.activated:
raise ValueError('already activated')
- self.activated = True
+ self.activated = datetime.datetime.utcnow()
+ # in case this is a restart, we clear old data
+ self.polls.clear()
+ self.jobs.clear()
+ # increase pool size to account for the dispatcher poll
self.db.setPoolSize(self.db.getPoolSize() + 1)
+ self.conn = self.db.open() # we keep the same connection for all
+ # polls as an optimization
if threaded:
self.reactor.callWhenRunning(self.threadedPoll)
else:
@@ -408,9 +466,9 @@
if not self.activated:
raise ValueError('not activated')
self.activated = False
- conn = self.db.open()
+ transaction.begin()
try:
- queues = conn.root().get(zc.async.interfaces.KEY)
+ queues = self.conn.root().get(zc.async.interfaces.KEY)
if queues is not None:
for queue in queues.values():
da = queue.dispatchers.get(self.UUID)
@@ -419,7 +477,7 @@
self._commit('trying to tear down')
finally:
transaction.abort()
- conn.close()
+ self.conn.close()
conn_delta = 0
for queue_pools in self.queues.values():
for name, pool in queue_pools.items():
@@ -427,3 +485,202 @@
self.dead_pools.append(queue_pools.pop(name))
conn_delta -= 1
self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
+
+ # these methods are used for monitoring and analysis
+
+ STOPPED = 'STOPPED'
+ RUNNING = 'RUNNING'
+ STUCK = 'STUCK'
+ STARTING = 'STARTING'
+
+ def getStatusInfo(self):
+ res = {'time since last poll': None, 'uptime': None, 'uuid': self.UUID}
+ poll_interval = res['poll interval'] = datetime.timedelta(
+ seconds=self.poll_interval)
+ if not self.activated:
+ res['status'] = self.STOPPED
+ else:
+ now = datetime.datetime.utcnow()
+ try:
+ poll = self.polls.first()
+ except ValueError:
+ # no polls
+ next = self.activated + poll_interval
+ if next < now:
+ res['status'] = self.STUCK
+ else:
+ res['status'] = self.STARTING
+ res['time since last poll'] = now - self.activated
+ else:
+ next = poll.utc_timestamp + poll_interval
+ if next < now:
+ res['status'] = self.STUCK
+ else:
+ res['status'] = self.RUNNING
+ res['time since last poll'] = now - poll.utc_timestamp
+ res['uptime'] = now - self.activated
+ return res
+
+ def getJobInfo(self, oid, database_name=None):
+ if database_name is None:
+ # these will raise ValueErrors for unknown oids. We'll let 'em.
+ minKey = self.jobs.minKey((oid,))
+ maxKey = self.jobs.maxKey((oid,))
+ if minKey != maxKey:
+ raise ValueError('ambiguous database name')
+ else:
+ database_name = minKey[1]
+ return self.jobs[(oid, database_name)]
+
+ def getActiveJobIds(self, queue=None, agent=None):
+ """returns active jobs from newest to oldest"""
+ res = []
+ try:
+ poll = self.polls.first()
+ except ValueError:
+ pass
+ else:
+ old = []
+ unknown = []
+ for info in _iter_info(poll, queue, agent):
+ res.extend(info['new jobs'])
+ for job_id in info['active jobs']:
+ job_info = self.jobs.get(job_id)
+ if job_info is None:
+ unknown.append(job_id)
+ else:
+ bisect.insort(old, (job_info['poll id'], job_id))
+ res.extend(i[1] for i in old)
+ res.extend(unknown)
+ return res
+
+ def getPollInfo(self, at=None, before=None):
+ if at is not None:
+ if before is not None:
+ raise ValueError('may only provide one of `at` and `before`')
+ if isinstance(at, datetime.datetime):
+ at = zc.async.utils.dt_to_long(at)
+ elif before is not None:
+ if isinstance(before, datetime.datetime):
+ at = zc.async.utils.dt_to_long(before) + 16
+ else:
+ at = before + 1
+ for bucket in tuple(self.polls._data.buckets): # freeze order
+ try:
+ if at is None:
+ key = bucket.minKey()
+ else:
+ key = bucket.minKey(at)
+ return bucket[key]
+ except (ValueError, KeyError):
+ # ValueError because minKey might not have a value
+ # KeyError because bucket might be cleared in another thread
+ # between minKey and __getitem__
+ pass
+ raise ValueError('no poll matches')
+
+ def iterPolls(self, at=None, before=None, since=None, count=None):
+ # `polls` may be mutated during iteration so we don't iterate over it
+ if at is not None and before is not None:
+ raise ValueError('may only provide one of `at` and `before`')
+ if isinstance(since, datetime.datetime):
+ since = zc.async.utils.dt_to_long(since) + 15
+ ct = 0
+ while 1:
+ if count is not None and ct >= count:
+ break
+ try:
+ info = self.getPollInfo(at=at, before=before)
+ except ValueError:
+ break
+ else:
+ if since is None or before <= since:
+ yield info
+ ct += 1
+ before = info.key
+ at = None
+ else:
+ break
+
+ def getStatistics(self, at=None, before=None, since=None, queue=None,
+ agent=None):
+ if at is not None and before is not None:
+ raise ValueError('may only provide one of `at` and `before`')
+ res = {
+ 'started': 0,
+ 'successful': 0,
+ 'failed': 0,
+ 'unknown': 0
+ }
+ started = successful = failed = unknown = 0
+ _pair = (None, None)
+ successful_extremes = [_pair, _pair]
+ failed_extremes = [_pair, _pair]
+ active_extremes = [_pair, _pair]
+ now = datetime.datetime.utcnow()
+ first = True
+ poll = first_poll = None
+ def process(jobs):
+ for jobid in jobs:
+ jobinfo = self.jobs.get(jobid)
+ if jobinfo is None:
+ res['unknown'] += 1
+ continue
+ if jobinfo['completed']:
+ if jobinfo['failed']:
+ pair = failed_extremes
+ res['failed'] += 1
+ else:
+ pair = successful_extremes
+ res['successful'] += 1
+ else:
+ pair = active_extremes
+ start = jobinfo['started'] or poll_time
+ stop = jobinfo['completed'] or now
+ duration = stop - start
+ if pair[0][0] is None or pair[0][0] > duration:
+ pair[0] = (duration, jobid)
+ if pair[1][0] is None or pair[1][0] < duration:
+ pair[1] = (duration, jobid)
+ for poll in self.iterPolls(at=at, before=before, since=since):
+ poll_time = poll.utc_timestamp
+ for agent_info in _iter_info(poll, queue, agent):
+ res['started'] += len(agent_info['new jobs'])
+ process(agent_info['new jobs'])
+ if first:
+ first = False
+ first_poll = poll
+ if poll is not None:
+ for agent_info in _iter_info(poll, queue, agent):
+ process(agent_info['active jobs'])
+ if first_poll is not None:
+ stat_start = first_poll.utc_timestamp
+ stat_end = poll.utc_timestamp
+ else:
+ start_start = None
+ stat_end = None
+ res.update({
+ 'shortest successful': successful_extremes[0][1],
+ 'longest successful': successful_extremes[1][1],
+ 'shortest failed': failed_extremes[0][1],
+ 'longest failed': failed_extremes[1][1],
+ 'shortest active': active_extremes[0][1],
+ 'longest active': active_extremes[1][1],
+ 'statistics start': stat_start,
+ 'statistics end': stat_end,
+ })
+ return res
+
+def _iter_info(poll, queue, agent):
+ if queue is None:
+ queues = poll.values()
+ elif queue not in poll:
+ queues = []
+ else:
+ queues = [poll[queue]]
+ for q in queues:
+ if agent is None:
+ for i in q.values():
+ yield i
+ elif agent in q:
+ yield q[agent]
Modified: zc.async/branches/dev/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.txt 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/dispatcher.txt 2008-04-02 03:11:14 UTC (rev 85059)
@@ -189,7 +189,11 @@
>>> poll = get_poll()
>>> import pprint
>>> pprint.pprint(dict(poll))
- {'': {'main': {'new_jobs': [], 'error': None, 'len': 0, 'size': 3}}}
+ {'': {'main': {'active jobs': [],
+ 'error': None,
+ 'len': 0,
+ 'new jobs': [],
+ 'size': 3}}}
The pool size for the db has increased again to account for the size of the
agent.
@@ -215,22 +219,39 @@
>>> for poll in dispatcher.polls:
... if (poll.get('') and poll[''].get('main') and
- ... poll['']['main']['new_jobs']):
+ ... poll['']['main']['new jobs']):
... break
... else:
... assert False, 'poll not found'
...
>>> pprint.pprint(dict(poll)) # doctest: +ELLIPSIS
- {'': {'main': {'error': None,
+ {'': {'main': {'active jobs': [],
+ 'error': None,
'len': 0,
- 'new_jobs': [{'assignerUUID': UUID(...),
- 'begin_after': '...',
- 'callable': '<built-in function mul>',
- 'oid': '...',
- 'quota_names': (),
- 'thread': ...}],
+ 'new jobs': [('\x00...', 'unnamed')],
'size': 3}}}
+[#getPollInfo]_ Notice our ``new jobs`` has a value in it now. We can get
+some information about that job from the dispatcher.
+
+ >>> info = dispatcher.getJobInfo(*poll['']['main']['new jobs'][0])
+ >>> pprint.pprint(info)
+ ... # doctest: +ELLIPSIS
+ {'call': "<zc.async.job.Job (oid ..., db 'unnamed') ``<built-in function mul>(14, 3)``>",
+ 'completed': datetime.datetime(...),
+ 'failed': False,
+ 'poll id': ...,
+ 'quota names': (),
+ 'result': '42',
+ 'started': datetime.datetime(...),
+ 'thread': ...}
+ >>> info['thread'] is not None
+ True
+ >>> info['poll id'] is not None
+ True
+
+Notice that the result is a repr.
+
As seen in other documents in zc.async, the job can also be a method of a
persistent object, affecting a persistent object.
@@ -287,6 +308,31 @@
>>> wait_for_result(job4)
"reply is 'HIYA'. Locally it is MISSING."
+We can analyze the work the dispatcher has done. the records for this
+generally only go back about a day.
+
+ >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
+ {'failed': 0,
+ 'longest active': None,
+ 'longest failed': None,
+ 'longest successful': ('\x00...', 'unnamed'),
+ 'shortest active': None,
+ 'shortest failed': None,
+ 'shortest successful': ('\x00...', 'unnamed'),
+ 'started': 5,
+ 'statistics end': datetime.datetime(...),
+ 'statistics start': datetime.datetime(...),
+ 'successful': 5,
+ 'unknown': 0}
+
+We can get a report on the reactor's status.
+
+ >>> pprint.pprint(dispatcher.getStatusInfo()) # doctest: +ELLIPSIS
+ {'poll interval': datetime.timedelta(0, 0, 500000),
+ 'status': 'RUNNING',
+ 'time since last poll': datetime.timedelta(...),
+ 'uptime': datetime.timedelta(...)}
+
When we stop the reactor, the dispatcher also deactivates.
>>> reactor.callFromThread(reactor.stop)
@@ -298,11 +344,22 @@
... assert False, 'dispatcher did not deactivate'
...
+ >>> pprint.pprint(dispatcher.getStatusInfo()) # doctest: +ELLIPSIS
+ {'poll interval': datetime.timedelta(0, 0, 500000),
+ 'status': 'STOPPED',
+ 'time since last poll': None,
+ 'uptime': None,
+ 'uuid': UUID('...')}
+
The db's pool size has returned to the original value.
>>> db.getPoolSize()
7
+.. ......... ..
+.. Footnotes ..
+.. ......... ..
+
.. [#setUp]
>>> import ZODB.FileStorage
@@ -377,6 +434,24 @@
... assert False, 'job never completed'
...
+.. [#getPollInfo] The dispatcher has a ``getPollInfo`` method that lets you
+ find this poll information also.
+
+ >>> dispatcher.getPollInfo(at=poll.key) is poll
+ True
+ >>> dispatcher.getPollInfo(at=poll.utc_timestamp) is poll
+ True
+ >>> dispatcher.getPollInfo(before=poll.key) is not poll
+ True
+ >>> dispatcher.getPollInfo(before=poll.utc_timestamp) is not poll
+ True
+ >>> dispatcher.getPollInfo(before=poll.key-16) is poll
+ True
+ >>> dispatcher.getPollInfo(
+ ... before=poll.utc_timestamp + datetime.timedelta(seconds=0.4)
+ ... ) is poll
+ True
+
.. [#wait_for_annotation]
>>> def wait_for_annotation(job, name):
Modified: zc.async/branches/dev/src/zc/async/instanceuuid.py
===================================================================
--- zc.async/branches/dev/src/zc/async/instanceuuid.py 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/instanceuuid.py 2008-04-02 03:11:14 UTC (rev 85059)
@@ -14,9 +14,9 @@
workers to connect to a single database to do work. The software
expects an instance home to only generate a single process.
-To get a new identifier for this software instance, delete this file and
-restart Zope (or more precisely, delete this file, restart Python, and
-import zc.async.instanceuuid). This file will be recreated with a new value.
+To get a new identifier for this software instance, delete this file,
+restart Python, and import zc.async.instanceuuid. This file will be
+recreated with a new value.
"""
zope.interface.classImplements(uuid.UUID, zc.async.interfaces.IUUID)
Modified: zc.async/branches/dev/src/zc/async/job.py
===================================================================
--- zc.async/branches/dev/src/zc/async/job.py 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/job.py 2008-04-02 03:11:14 UTC (rev 85059)
@@ -18,6 +18,27 @@
import zc.async.interfaces
import zc.async.utils
+def _repr(obj):
+ if isinstance(obj, persistent.Persistent):
+ dbname = "?"
+ if obj._p_jar is not None:
+ dbname = getattr(obj._p_jar.db(), 'database_name', "?")
+ if dbname != '?':
+ dbname = repr(dbname)
+ if obj._p_oid is not None:
+ oid = ZODB.utils.u64(obj._p_oid)
+ else:
+ oid = '?'
+ return '%s.%s (oid %s, db %s)' % (
+ obj.__class__.__module__,
+ obj.__class__.__name__,
+ oid,
+ dbname)
+ elif isinstance(obj, types.FunctionType):
+ return '%s.%s' % (obj.__module__, obj.__name__)
+ else:
+ return repr(obj)
+
def success_or_failure(success, failure, res):
callable = None
if isinstance(res, twisted.python.failure.Failure):
@@ -53,7 +74,7 @@
_quota_names = ()
def __init__(self, *args, **kwargs):
- self.args = persistent.list.PersistentList(args)
+ self.args = persistent.list.PersistentList(args) # TODO: blist
self.callable = self.args.pop(0)
self.kwargs = persistent.mapping.PersistentMapping(kwargs)
self.callbacks = zc.queue.PersistentQueue()
@@ -167,6 +188,23 @@
res.args.insert(0, res)
return res
+ def __repr__(self):
+ try:
+ call = _repr(self._callable_root)
+ if self._callable_name is not None:
+ call += ' :' + self._callable_name
+ args = ', '.join(_repr(a) for a in self.args)
+ kwargs = ', '.join(k+"="+_repr(v) for k, v in self.kwargs.items())
+ if args:
+ if kwargs:
+ args += ", " + kwargs
+ else:
+ args = kwargs
+ return '<%s ``%s(%s)``>' % (_repr(self), call, args)
+ except (TypeError, ValueError, AttributeError):
+ # broken reprs are a bad idea; they obscure problems
+ return super(Job, self).__repr__()
+
@property
def callable(self):
if self._callable_name is None:
Modified: zc.async/branches/dev/src/zc/async/job.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/job.txt 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/job.txt 2008-04-02 03:11:14 UTC (rev 85059)
@@ -39,8 +39,15 @@
>>> import transaction
>>> transaction.commit()
-Now we have a job [#verify]_. Initially it has a NEW status.
+Now we have a job [#verify]_. The __repr__ tries to be helpful, identifying
+the persistent object identifier ("oid") in hex and the database ("db"), and
+trying to render the call.
+ >>> j # doctest: +ELLIPSIS
+ <zc.async.job.Job (oid ... db 'unnamed') ``zc.async.doctest_test.call()``>
+
+Initially it has a NEW status.
+
>>> import zc.async.interfaces
>>> j.status == zc.async.interfaces.NEW
True
@@ -75,7 +82,14 @@
>>> demo.counter
0
>>> j = root['j'] = zc.async.job.Job(demo.increase)
+ >>> j # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ <zc.async.job.Job (oid ?, db ?)
+ ``zc.async.doctest_test.Demo (oid ?, db ?) :increase()``>
+
>>> transaction.commit()
+ >>> j # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ <zc.async.job.Job (oid ..., db 'unnamed')
+ ``zc.async.doctest_test.Demo (oid ..., db 'unnamed') :increase()``>
>>> j() # result is None
>>> demo.counter
1
@@ -197,6 +211,10 @@
>>> j = root['j3'] = zc.async.job.Job(
... argCall, root['demo2'], value=4)
>>> transaction.commit()
+ >>> j # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ <zc.async.job.Job (oid ..., db 'unnamed')
+ ``zc.async.doctest_test.argCall(zc.async.doctest_test.PersistentDemo (oid ..., db 'unnamed'),
+ value=4)``>
>>> j()
>>> root['demo2'].value
4
@@ -274,7 +292,7 @@
... print "failure.", f
...
>>> j.addCallbacks(success, failure) # doctest: +ELLIPSIS
- <zc.async.job.Job object at ...>
+ <zc.async.job.Job ...>
>>> res = j()
success! 15
@@ -283,7 +301,7 @@
>>> j = root['j'] = zc.async.job.Job(multiply, 5, None)
>>> transaction.commit()
>>> j.addCallbacks(success, failure) # doctest: +ELLIPSIS
- <zc.async.job.Job object at ...>
+ <zc.async.job.Job (oid ...>
>>> res = j() # doctest: +ELLIPSIS
failure. [Failure instance: Traceback: exceptions.TypeError...]
@@ -298,9 +316,9 @@
>>> j = root['j'] = zc.async.job.Job(multiply, 5, 3)
>>> transaction.commit()
>>> j.addCallbacks(success) # doctest: +ELLIPSIS
- <zc.async.job.Job object at ...>
+ <zc.async.job.Job (oid ...>
>>> j.addCallbacks(also_success) # doctest: +ELLIPSIS
- <zc.async.job.Job object at ...>
+ <zc.async.job.Job (oid ...>
>>> res = j()
success! 15
also a success! 15
@@ -308,9 +326,9 @@
>>> j = root['j'] = zc.async.job.Job(multiply, 5, None)
>>> transaction.commit()
>>> j.addCallbacks(failure=failure) # doctest: +ELLIPSIS
- <zc.async.job.Job object at ...>
+ <zc.async.job.Job (oid ...>
>>> j.addCallbacks(failure=also_failure) # doctest: +ELLIPSIS
- <zc.async.job.Job object at ...>
+ <zc.async.job.Job (oid ...>
>>> res = j() # doctest: +ELLIPSIS
failure. [Failure instance: Traceback: exceptions.TypeError...]
also a failure. [Failure instance: Traceback: exceptions.TypeError...]
@@ -346,7 +364,7 @@
>>> transaction.commit()
>>> j.addCallbacks(zc.async.job.Job(multiply, 4)
... ).addCallbacks(success) # doctest: +ELLIPSIS
- <zc.async.job.Job object at ...>
+ <zc.async.job.Job (oid ...>
>>> res = j()
success! 60
@@ -359,7 +377,7 @@
>>> transaction.commit()
>>> j.addCallbacks(
... failure=handle_failure).addCallbacks(success) # doctest: +ELLIPSIS
- <zc.async.job.Job object at ...>
+ <zc.async.job.Job (oid ...>
>>> res = j()
success! 0
@@ -536,7 +554,7 @@
>>> transaction.commit()
>>> j.args.append(j)
>>> res = j() # doctest: +ELLIPSIS
- <zc.async.job.Job object at ...>
+ <zc.async.job.Job (oid ...>
A class method on Job, ``bind``, can simplify this. It puts the job as
the first argument to the callable, as if the callable were bound as a method
@@ -545,7 +563,7 @@
>>> j = root['j'] = zc.async.job.Job.bind(show)
>>> transaction.commit()
>>> res = j() # doctest: +ELLIPSIS
- <zc.async.job.Job object at ...>
+ <zc.async.job.Job (oid ...>
Result and Status
-----------------
Added: zc.async/branches/dev/src/zc/async/monitor.py
===================================================================
--- zc.async/branches/dev/src/zc/async/monitor.py (rev 0)
+++ zc.async/branches/dev/src/zc/async/monitor.py 2008-04-02 03:11:14 UTC (rev 85059)
@@ -0,0 +1,291 @@
+import re
+import datetime
+import pytz
+import uuid
+import simplejson
+
+import zope.component
+
+import zc.async.dispatcher
+
+_marker = object()
+class Encoder(simplejson.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, datetime.timedelta):
+ tmp = {'days': obj.days,
+ 'hours': obj.seconds // (60*60),
+ 'minutes': (obj.seconds % (60*60)) // 60,
+ 'seconds': float(
+ obj.seconds % 60) + obj.microseconds/1000000
+ }
+ res = dict((k, v) for k, v in tmp.items() if v)
+ if not res:
+ res['seconds'] = 0
+ return res
+ # TODO the spelling of this conditional is to support our test setup
+ # shenanigans. originally was ``isinstance(obj, datetime.datetime)``.
+ # Would be nice to fix, though the duck typing is Pythonic at least.
+ elif (getattr(obj, 'tzinfo', _marker) is not _marker and
+ getattr(obj, 'astimezone', _marker) is not _marker):
+ if obj.tzinfo is not None:
+ obj = obj.astimezone(pytz.UTC).replace(tzinfo=None)
+ return obj.isoformat() + "Z"
+ elif isinstance(obj, uuid.UUID):
+ return str(obj)
+ return simplejson.JSONEncoder.default(self, obj)
+
+encoder = Encoder(sort_keys=True, indent=4)
+
+
+def status(uuid=None):
+ """Get general zc.async dispatcher information.
+
+ 'status' is one of 'STUCK', 'STARTING', 'RUNNING', or 'STOPPED'."""
+ if uuid is not None:
+ uuid = uuid.UUID(uuid)
+ return encoder.encode(zc.async.dispatcher.get(uuid).getStatusInfo())
+
+def jobs(queue=None, agent=None, uuid=None):
+ """Show active jobs as of last poll, sorted from newest to oldest.
+
+ Usage:
+
+ jobs
+ (returns active jobs as of last poll, newest to oldest)
+
+ jobs queue:<queue name>
+ (jobs are filtered to those coming from the named queue)
+
+ jobs agent:<agent name>
+ (jobs are filtered to those coming from agents with given name)
+
+ "queue:" and "agent:" modifiers may be combined.
+
+ Example:
+
+ async jobs queue: agent:main
+ (results filtered to queue named '' and agent named 'main')"""
+ if uuid is not None:
+ uuid = uuid.UUID(uuid)
+ return encoder.encode(
+ zc.async.dispatcher.get(uuid).getActiveJobIds(queue, agent))
+
+def job(OID, database=None, uuid=None):
+ """Local information about a job as of last poll, if known.
+
+ Does not consult ZODB, but in-memory information."""
+ if uuid is not None:
+ uuid = uuid.UUID(uuid)
+ return encoder.encode(
+ zc.async.dispatcher.get(uuid).getJobInfo(OID, database))
+
+_find = re.compile(r'\d+[DHMS]').findall
+def _dt(s):
+ if s is None:
+ res = s
+ else:
+ try:
+ res = int(s)
+ except ValueError:
+ vals = {}
+ for val in _find(s.upper()):
+ vals[val[-1]] = int(val[:-1])
+ res = datetime.timedelta(
+ days=vals.get('D', 0),
+ hours=vals.get('H', 0),
+ minutes=vals.get('M', 0),
+ seconds=vals.get('S', 0)) + datetime.datetime.utcnow()
+ return res
+
+
+def jobstats(at=None, before=None, since=None, queue=None, agent=None,
+ uuid=None):
+ """Statistics on historical jobs as of last poll.
+
+ Usage:
+
+ jobstats
+ (returns statistics on historical jobs as of last poll)
+
+ jobstats queue:<queue name>
+ (statistics are filtered to those coming from the named queue)
+
+ jobstats agent:<agent name>
+ (statistics are filtered to those coming from agents with given name)
+
+ jobstats at:<poll key or interval>
+ (statistics are collected at or before the poll key or interval)
+
+ jobstats before:<pollkey or interval>
+ (statistics are collected before the poll key or interval)
+
+ jobstats since:<pollkey or interval>
+ (statistics are collected since poll key or interval, inclusive)
+
+ The modifiers "queue:", "agent:", "since:", and one of "at:" or "before:"
+ may be combined.
+
+ Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+ be replaced with a positive integer, and "D," "H," "M," and "S" are
+ literals standing for "days," "hours," "minutes," and "seconds."
+ For instance, you might use ``5M`` for five minutes, ``20S`` for
+ twenty seconds, or ``1H30M`` for an hour and a half.
+
+ Poll keys are the values shown as "key" from the ``poll`` or ``polls``
+ command.
+
+ Example:
+
+ async jobstats queue: agent:main since:1H
+ (results filtered to queue named '' and agent named 'main' from now
+ till one hour ago)"""
+ # TODO: parse since and before to datetimes
+ if uuid is not None:
+ uuid = uuid.UUID(uuid)
+ return encoder.encode(
+ zc.async.dispatcher.get(uuid).getStatistics(
+ _dt(at), _dt(before), _dt(since), queue, agent))
+
+def poll(at=None, before=None, uuid=None):
+ """Get information about a single poll, defaulting to most recent.
+
+ Usage:
+
+ poll
+ (returns most recent poll)
+
+ poll at:<poll key or interval>
+ (returns poll at or before the poll key or interval)
+
+ poll before:<poll key or interval>
+ (returns poll before the poll key or interval)
+
+ Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+ be replaced with a positive integer, and "D," "H," "M," and "S" are
+ literals standing for "days," "hours," "minutes," and "seconds."
+ For instance, you might use ``5M`` for five minutes, ``20S`` for
+ twenty seconds, or ``1H30M`` for an hour and a half.
+
+ Example:
+
+ async poll at:5M
+ (get the poll information at five minutes ago or before)"""
+ # TODO: parse at and before to datetimes
+ if uuid is not None:
+ uuid = uuid.UUID(uuid)
+ info = zc.async.dispatcher.get(uuid).getPollInfo(_dt(at), _dt(before))
+ res = {'key': info.key, 'time': info.utc_timestamp.isoformat() + "Z",
+ 'results': info}
+ return encoder.encode(res)
+
+def polls(at=None, before=None, since=None, count=None, uuid=None):
+ """Get information about recent polls, defaulting to most recent.
+
+ Usage:
+
+ polls
+ (returns most recent 3 poll)
+
+ polls at:<poll key or interval>
+ (returns up to 3 polls at or before the poll key or interval)
+
+ polls before:<poll key or interval>
+ (returns up to 3 polls before the poll key or interval)
+
+ polls since:<poll key or interval>
+ (returns polls since the poll key or interval, inclusive)
+
+ polls count <positive integer>
+ (returns the given number of the most recent files)
+
+ The modifiers "since:", "count:", and one of "at:" or "before:" may
+ be combined.
+
+ Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+ be replaced with a positive integer, and "D," "H," "M," and "S" are
+ literals standing for "days," "hours," "minutes," and "seconds."
+ For instance, you might use ``5M`` for five minutes, ``20S`` for
+ twenty seconds, or ``1H30M`` for an hour and a half.
+
+ Example:
+
+ async polls before:5M since:10M
+ (get the poll information from 5 to 10 minutes ago)"""
+ if uuid is not None:
+ uuid = uuid.UUID(uuid)
+ if count is None:
+ if since is None:
+ count = 3
+ else:
+ count = int(count)
+ return encoder.encode(
+ [{'key': p.key, 'time': p.utc_timestamp.isoformat() + "Z",
+ 'results': p}
+ for p in zc.async.dispatcher.get(uuid).iterPolls(
+ _dt(at), _dt(before), _dt(since), count)])
+
+# provide in async and separately:
+
+def utcnow():
+ """Return the current time in UTC, in ISO 8601 format."""
+ return datetime.datetime.utcnow().isoformat() + "Z"
+
+def UUID():
+ """Get instance UUID in hex."""
+ res = zope.component.getUtility(zc.async.interfaces.IUUID)
+ if res is not None:
+ return str(res)
+
+funcs = {}
+
+def help(cmd=None):
+ """Get help on an async monitor tool.
+
+ Usage is 'async help <tool name>' or 'async help'."""
+ if cmd is None:
+ res = [
+ "These are the tools available. Usage for each tool is \n"
+ "'async <tool name> [modifiers...]'. Learn more about each \n"
+ "tool using 'async help <tool name>'.\n"]
+ for nm, func in sorted(funcs.items()):
+ res.append('%s: %s' % (
+ nm, func.__doc__.split('\n', 1)[0]))
+ return '\n'.join(res)
+ f = funcs.get(cmd)
+ if f is None:
+ return 'Unknown async tool'
+ return f.__doc__
+
+for f in status, jobs, job, jobstats, poll, polls, utcnow, UUID, help:
+ funcs[f.__name__] = f
+
+def async(connection, cmd=None, *raw):
+ """A collection of tools to monitor zc.async activity in this process.
+
+ To see a list of async tools, use 'async help'.
+
+ To learn more about an async monitor tool, use 'async help <tool name>'."""
+ if cmd is None:
+ res = async.__doc__
+ else:
+ f = funcs.get(cmd)
+ if f is None:
+ res = '[Unknown async tool]'
+ else:
+ args = []
+ kwargs = {}
+ for val in raw:
+ if ':' in val:
+ key, val = val.split(':', 1)
+ kwargs[key] = val
+ else:
+ if kwargs:
+ raise ValueError(
+ 'placeful modifiers must come before named '
+ 'modifiers')
+ args.append(val)
+ res = f(*args, **kwargs)
+ connection.write(res)
+ connection.write('\n')
+
+
Modified: zc.async/branches/dev/src/zc/async/monitor.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/monitor.txt 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/monitor.txt 2008-04-02 03:11:14 UTC (rev 85059)
@@ -1,191 +1,377 @@
-THIS SHOULD BE ONLY RUN WITH A CUSTOM SET UP. BASIC zc.async SHOULD NOT
-DEPEND ON Z3MONITOR
+Monitoring Dispatchers
+======================
-Monitoring Agents
-=================
+A process's zc.async dispatcher[#setUp]_ can be monitored in-process via
+zc.z3monitor plugins. Let's imagine we have a connection over which we
+can send text messages to the monitor server [#z3monitor_setup]_.
-Agents can be monitored in-process via zc.z3monitor plugins. Let's imagine
-we have a connection over which we can send text messages to the monitor
-server [#z3monitor_setup]_.
-
All monitoring is done through the ``async`` command. Here is its
description, using the zc.z3monitor ``help`` command.
>>> connection.test_input('help async\n')
+ Help for async:
+ <BLANKLINE>
A collection of tools to monitor zc.async activity in this process.
-
- To learn more about an async monitor tool, use 'async help <tool name>'.
-
- These are the tools available. Usage for each tool is
- 'async <tool name> [arguments...]'.
-
- help <tool name>: Get help on the named tool.
- agents [poll_timeout]: Get information about agents in this process.
- jobs: Get information about current jobs.
- jobhistory [time]: Get historical statistics of jobs.
- agent <agent name>: Get detailed information about a given agent.
- agentjobs <agent name>: Get detailed information about the agent's jobs.
- job <oid> [database name]: Get information about a job.
- log <agent name> count: Get up to 100 of the last log entries for a given agent.
+ <BLANKLINE>
+ To see a list of async tools, use 'async help'.
+ <BLANKLINE>
+ To learn more about an async monitor tool, use 'async help <tool name>'.
-> CLOSE
As you can see, you use ``async help`` to get more information about each
async-specific command.
>>> connection.test_input('async help\n')
- Get help on an async monitor tool.
-
- Usage is 'async help <tool name>'. 'async help' (or 'async help help')
- returns this message.
-
- These are the tools available.
-
- help <tool name>: Get help on the named tool.
- agents [poll_timeout]: Get information about agents in this process.
- jobs: Get information about jobs
- jobhistory [time]: Get information about job history.
- agent <agent name>: Get information about a given agent.
- agentjobs <agent name>: Get information about the agent's jobs.
- agentjobhistory <agent name> [time]: Statistics on agent's job history.
- job <oid> [database name]: Get information about a job.
+ These are the tools available. Usage for each tool is
+ 'async <tool name> [modifiers...]'. Learn more about each
+ tool using 'async help <tool name>'.
+ <BLANKLINE>
+ UUID: Get instance UUID in hex.
+ help: Get help on an async monitor tool.
+ job: Local information about a job as of last poll, if known.
+ jobs: Show active jobs as of last poll, sorted from newest to oldest.
+ jobstats: Statistics on historical jobs as of last poll.
+ poll: Get information about a single poll, defaulting to most recent.
+ polls: Get information about recent polls, defaulting to most recent.
+ status: Get general zc.async dispatcher information.
+ utcnow: Return the current time in UTC, in ISO 8601 format.
-> CLOSE
-Let's get the help for each async command now. The ``agents`` command is
-the broadest summary.
+Let's give a quick run through these for an overview, and then we'll dig in
+just a bit.
- >>> connection.test_input('async help agents\n')
- Get information about agents in this process.
-
- Usage: async agents [poll_timeout]
-
- If ``poll_timeout`` is not provided, it defaults to 10 (seconds).
-
- Returns this information, an element per line:
-
- - the UID of this process;
- - the total number of registered agents;
- - the names of registered agents;
- - the number of running agents;
- - the names of running agents;
- - the number of agents that have not polled within the past poll_timeout
- seconds;
- - the names of agents that have not polled within the last poll_timeout
- seconds;
+The ``UUID`` command returns the instance's UUID.
+
+ >>> connection.test_input('async help UUID\n')
+ Get instance UUID in hex.
-> CLOSE
-The ``jobs`` command gives a summary of the current active jobs for all
-agents.
+ >>> connection.test_input('async UUID\n')
+ d10f43dc-ffdf-11dc-abd4-0017f2c49bdd
+ -> CLOSE
- >>> connection.test_input('async help jobs')
- Get information about jobs
-
- Usage: async jobs
-
- Returns this information, an element per line:
-
- - the total number of active jobs with a connection in all agents;
- - the total number of active jobs in all agents;
- - the time in seconds of the oldest active job, or 0;
- - the name of the agent with the oldest active job, or an empty line;
- - the oid, database_name of the oldest active job, or an empty line;
- - the time in seconds of the oldest active job without a connection, or 0;
- - the name of the agent with the oldest active job without a connection,
- or an empty line;
- - the oid, database_name of the oldest active job without a connection,
- or an empty line;
+The ``utcnow`` command returns the current time in UTC. This can be
+convenient to decipher the meaning of UTC datetimes returned from other
+commands.
+
+ >>> connection.test_input('async help utcnow\n')
+ Return the current time in UTC, in ISO 8601 format.
-> CLOSE
-The ``jobhistory`` command gives information on the past jobs run by all
-agents.
+ >>> connection.test_input('async utcnow\n')
+ 2006-08-10T15:44:23.000211Z
+ -> CLOSE
- >>> connection.test_input('async help jobhistory')
- Get historical statistics of jobs.
-
- Usage: async jobhistory [time]
-
- If ``time`` is not provided, returns information since process started.
-
- Otherwise, ``time`` should be any of the following:
- [1-...]d
- [1-23]h
- [1-59]m
- where "d" indicates day or days, "h" hour or hours, and "m" minute or
- minutes.
-
- Returns this information, an element per line:
+The ``status`` command is the first of the "serious" monitoring
+commands. As such, it starts some patterns that the rest of the
+commands will follow.
- - the total number of jobs started by all registered agents in this
- process, in time period if given;
- - the total number of jobs completed by all registered agents in this
- process, in time period if given;
- - the total number of jobs completed by all registered agents in this
- process that ended in a Failure, in time period if given;
- - the name of the agent with the most completed jobs in this process, in
- time period if given;
- - the name of the agent with the highest total number of failures in this
- process, in time period if given;
- - the name of the agent with the most started jobs in this process, in
- time period if given.
+- output is pretty-printed JSON
+
+- durations are in a dict of keys 'days', 'hours', 'minutes', and 'seconds',
+ with all as ints except seconds, which is a float.
+
+ >>> connection.test_input('async help status\n')
+ Get general zc.async dispatcher information.
+ <BLANKLINE>
+ 'status' is one of 'STUCK', 'STARTING', 'RUNNING', or 'STOPPED'.
-> CLOSE
-The ``agent`` tool gives information specific to an individual agent.
+ >>> connection.test_input('async status\n')
+ {
+ "poll interval": {
+ "seconds": 5.0
+ },
+ "status": "RUNNING",
+ "time since last poll": {
+ "seconds": 1.0
+ },
+ "uptime": {
+ "seconds": 1.0
+ },
+ "uuid": "d10f43dc-ffdf-11dc-abd4-0017f2c49bdd"
+ }
+ -> CLOSE
- >>> connection.test_input('async help agent\n')
- Get information about a given agent.
-
- Usage: async agent <agent name> [time]
-
- If ``time`` is not provided, returns information since process started.
-
- Otherwise, ``time`` should be any of the following:
- [1-...]d
- [1-23]h
- [1-59]m
- where "d" indicates day or days, "h" hour or hours, and "m" minute or
- minutes.
-
- Returns this information, an element per line:
-
- - number of seconds since last poll;
- - total number of worker threads/connections for this agent;
- - number of active jobs with a connection for this agent;
- - number of active jobs for this agent;
- - the time in seconds of the oldest active job, or 0;
- - the time in seconds of the oldest active job without a connection, or 0;
- - the oid, database_name of the oldest active job, or an empty line;
- - the oid, database_name of the oldest active job without a connection,
- or an empty line;
- - the total number of jobs started by this agent since process start or
- in time period if given;
- - the total number of jobs completed by this agent since process start or
- in time period if given;
- - the total number of jobs completed by this agent since process start or
- in time period if given that ended in a Failure.
+Here's the ``jobs`` command. It introduces some new patterns.
+
+- some command modifiers are available as <modifier>:<value>
+
+- several commands have the "queue:" and "agent:" modifiers.
+
+ >>> connection.test_input('async help jobs\n')
+ Show active jobs as of last poll, sorted from newest to oldest.
+ <BLANKLINE>
+ Usage:
+ <BLANKLINE>
+ jobs
+ (returns active jobs as of last poll, newest to oldest)
+ <BLANKLINE>
+ jobs queue:<queue name>
+ (jobs are filtered to those coming from the named queue)
+ <BLANKLINE>
+ jobs agent:<agent name>
+ (jobs are filtered to those coming from agents with given name)
+ <BLANKLINE>
+ "queue:" and "agent:" modifiers may be combined.
+ <BLANKLINE>
+ Example:
+ <BLANKLINE>
+ async jobs queue: agent:main
+ (results filtered to queue named '' and agent named 'main')
-> CLOSE
-The ``log`` tool gives up to 100 of the last log entries of a given agent.
+ >>> connection.test_input('async jobs\n')
+ []
+ -> CLOSE
- >>> connection.test_input('async help log')
- Get up to 100 of the last log entries for a given agent.
-
- Usage: async log <agent name> [count]
- -> CLOSE
+The ``jobstats`` analyzes past polls and job information to come up with
+some potentially useful statistics. It includes the optional "queue:" and
+"agent:" modifiers. It also shows some new patterns.
-.. [#z3monitor_setup]
+- datetimes are in UTC, in ISO 8601 format.
+- The "at:", "before:" and "since:" modifiers are intervals, or poll keys.
+
+- "at:" and "before:" may not be combined.
+
+ >>> connection.test_input('async help jobstats\n')
+ Statistics on historical jobs as of last poll.
+ <BLANKLINE>
+ Usage:
+ <BLANKLINE>
+ jobstats
+ (returns statistics on historical jobs as of last poll)
+ <BLANKLINE>
+ jobstats queue:<queue name>
+ (statistics are filtered to those coming from the named queue)
+ <BLANKLINE>
+ jobstats agent:<agent name>
+ (statistics are filtered to those coming from agents with given name)
+ <BLANKLINE>
+ jobstats at:<poll key or interval>
+ (statistics are collected at or before the poll key or interval)
+ <BLANKLINE>
+ jobstats before:<pollkey or interval>
+ (statistics are collected before the poll key or interval)
+ <BLANKLINE>
+ jobstats since:<pollkey or interval>
+ (statistics are collected since poll key or interval, inclusive)
+ <BLANKLINE>
+ The modifiers "queue:", "agent:", "since:", and one of "at:" or "before:"
+ may be combined.
+ <BLANKLINE>
+ Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+ be replaced with a positive integer, and "D," "H," "M," and "S" are
+ literals standing for "days," "hours," "minutes," and "seconds."
+ For instance, you might use ``5M`` for five minutes, ``20S`` for
+ twenty seconds, or ``1H30M`` for an hour and a half.
+ <BLANKLINE>
+ Poll keys are the values shown as "key" from the ``poll`` or ``polls``
+ command.
+ <BLANKLINE>
+ Example:
+ <BLANKLINE>
+ async jobstats queue: agent:main since:1H
+ (results filtered to queue named '' and agent named 'main' from now
+ till one hour ago)
+ -> CLOSE
+
+ >>> connection.test_input('async jobstats\n')
+ {
+ "failed": 0,
+ "longest active": null,
+ "longest failed": null,
+ "longest successful": null,
+ "shortest active": null,
+ "shortest failed": null,
+ "shortest successful": null,
+ "started": 0,
+ "statistics end": "2006-08-10T15:44:22.000211Z",
+ "statistics start": "2006-08-10T15:44:22.000211Z",
+ "successful": 0,
+ "unknown": 0
+ }
+ -> CLOSE
+
+The ``poll`` command uses patterns we've seen above.
+
+ >>> connection.test_input('async help poll\n')
+ Get information about a single poll, defaulting to most recent.
+ <BLANKLINE>
+ Usage:
+ <BLANKLINE>
+ poll
+ (returns most recent poll)
+ <BLANKLINE>
+ poll at:<poll key or interval>
+ (returns poll at or before the poll key or interval)
+ <BLANKLINE>
+ poll before:<poll key or interval>
+ (returns poll before the poll key or interval)
+ <BLANKLINE>
+ Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+ be replaced with a positive integer, and "D," "H," "M," and "S" are
+ literals standing for "days," "hours," "minutes," and "seconds."
+ For instance, you might use ``5M`` for five minutes, ``20S`` for
+ twenty seconds, or ``1H30M`` for an hour and a half.
+ <BLANKLINE>
+ Example:
+ <BLANKLINE>
+ async poll at:5M
+ (get the poll information at five minutes ago or before)
+ -> CLOSE
+
+ >>> connection.test_input('async poll\n')
+ {
+ "key": 6420106068108777167,
+ "results": {
+ "": {}
+ },
+ "time": "2006-08-10T15:44:22.000211Z"
+ }
+ -> CLOSE
+
+``polls`` does too.
+
+ >>> connection.test_input('async help polls\n')
+ Get information about recent polls, defaulting to most recent.
+ <BLANKLINE>
+ Usage:
+ <BLANKLINE>
+ polls
+ (returns most recent 3 poll)
+ <BLANKLINE>
+ polls at:<poll key or interval>
+ (returns up to 3 polls at or before the poll key or interval)
+ <BLANKLINE>
+ polls before:<poll key or interval>
+ (returns up to 3 polls before the poll key or interval)
+ <BLANKLINE>
+ polls since:<poll key or interval>
+ (returns polls since the poll key or interval, inclusive)
+ <BLANKLINE>
+ polls count <positive integer>
+ (returns the given number of the most recent files)
+ <BLANKLINE>
+ The modifiers "since:", "count:", and one of "at:" or "before:" may
+ be combined.
+ <BLANKLINE>
+ Intervals are of the format ``[nD][nH][nM][nS]``, where "n" should
+ be replaced with a positive integer, and "D," "H," "M," and "S" are
+ literals standing for "days," "hours," "minutes," and "seconds."
+ For instance, you might use ``5M`` for five minutes, ``20S`` for
+ twenty seconds, or ``1H30M`` for an hour and a half.
+ <BLANKLINE>
+ Example:
+ <BLANKLINE>
+ async polls before:5M since:10M
+ (get the poll information from 5 to 10 minutes ago)
+ -> CLOSE
+
+ >>> connection.test_input('async polls\n')
+ [
+ {
+ "key": 6420106068108777167,
+ "results": {
+ "": {}
+ },
+ "time": "2006-08-10T15:44:22.000211Z"
+ }
+ ]
+ -> CLOSE
+
+.. [#setUp] See the discussion in other documentation to explain this code.
+
+ >>> from zc.twist import transactionManager, connection
+ >>> import zope.component
+ >>> zope.component.provideAdapter(transactionManager)
+ >>> zope.component.provideAdapter(connection)
+ >>> import ZODB.interfaces
+ >>> zope.component.provideAdapter(
+ ... transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+ >>> import zope.component
+ >>> import types
+ >>> import zc.async.interfaces
+ >>> import zc.async.job
+ >>> zope.component.provideAdapter(
+ ... zc.async.job.Job,
+ ... adapts=(types.FunctionType,),
+ ... provides=zc.async.interfaces.IJob)
+ >>> zope.component.provideAdapter(
+ ... zc.async.job.Job,
+ ... adapts=(types.MethodType,),
+ ... provides=zc.async.interfaces.IJob)
+ ...
+
+ >>> import zc.async.testing
+ >>> reactor = zc.async.testing.Reactor()
+ >>> reactor.start() # this mokeypatches datetime.datetime.now
+
+ >>> import ZODB.FileStorage
+ >>> storage = ZODB.FileStorage.FileStorage(
+ ... 'zc_async.fs', create=True)
+ >>> from ZODB.DB import DB
+ >>> db = DB(storage)
+ >>> conn = db.open()
+ >>> root = conn.root()
+
+ >>> import zc.async.queue
+ >>> import zc.async.interfaces
+ >>> mapping = root[zc.async.interfaces.KEY] = zc.async.queue.Queues()
+ >>> queue = mapping[''] = zc.async.queue.Queue()
+ >>> import transaction
+ >>> transaction.commit()
+
+ >>> from zc.async.instanceuuid import UUID
+ >>> import zope.component
+ >>> zope.component.provideUtility(
+ ... UUID, zc.async.interfaces.IUUID, '')
+
+ >>> import zc.async.dispatcher
+ >>> dispatcher = zc.async.dispatcher.Dispatcher(db, reactor)
+ >>> dispatcher.activate()
+ >>> reactor.time_flies(1)
+ 1
+
+ >>> import zc.async.agent
+ >>> agent = zc.async.agent.Agent()
+ >>> queue.dispatchers[dispatcher.UUID]['main'] = agent
+ >>> transaction.commit()
+
+ >>> import time
+ >>> def wait_for(*jobs, **kwargs):
+ ... reactor.time_flies(dispatcher.poll_interval) # starts thread
+ ... # now we wait for the thread
+ ... for i in range(kwargs.get('attempts', 10)):
+ ... while reactor.time_passes():
+ ... pass
+ ... transaction.begin()
+ ... for j in jobs:
+ ... if j.status != zc.async.interfaces.COMPLETED:
+ ... break
+ ... else:
+ ... break
+ ... time.sleep(0.1)
+ ... else:
+ ... print 'TIME OUT'
+ ...
+
+.. [#z3monitor_setup] This part actually sets up the monitoring.
+
>>> import zc.ngi.testing
>>> import zc.z3monitor
>>> connection = zc.ngi.testing.TextConnection()
>>> server = zc.z3monitor.Server(connection)
- >>> import zc.async.z3monitor
- >>> import zope.component, zc.z3monitor.interfaces
+ >>> import zc.async.monitor
+ >>> import zope.component
+ >>> import zc.z3monitor.interfaces
>>> zope.component.provideUtility(
- ... zc.async.z3monitor.agent,
+ ... zc.async.monitor.async,
... zc.z3monitor.interfaces.IZ3MonitorPlugin,
- ... 'agent')
- >>> zope.component.provideUtility(
- ... zc.async.z3monitor.agents,
- ... zc.z3monitor.interfaces.IZ3MonitorPlugin,
- ... 'agents')
+ ... 'async')
+ >>> zope.component.provideUtility(zc.z3monitor.help,
+ ... zc.z3monitor.interfaces.IZ3MonitorPlugin, 'help')
Modified: zc.async/branches/dev/src/zc/async/testing.py
===================================================================
--- zc.async/branches/dev/src/zc/async/testing.py 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/testing.py 2008-04-02 03:11:14 UTC (rev 85059)
@@ -41,6 +41,8 @@
def argh(*args, **kwargs):
return _datetime(*args, **kwargs)
+_datetime.max = _datetime(*old_datetime.max.__reduce__()[1])
+
def setUpDatetime():
datetime.datetime = _datetime
set_now(datetime.datetime(2006, 8, 10, 15, 44, 22, 211, pytz.UTC))
Modified: zc.async/branches/dev/src/zc/async/utils.py
===================================================================
--- zc.async/branches/dev/src/zc/async/utils.py 2008-04-01 18:33:46 UTC (rev 85058)
+++ zc.async/branches/dev/src/zc/async/utils.py 2008-04-02 03:11:14 UTC (rev 85059)
@@ -15,9 +15,9 @@
return getattr(self._data, name)(*args, **kwargs)
return wrapper
-log = logging.getLogger('zc.async')
+log = logging.getLogger('zc.async.events')
+tracelog = logging.getLogger('zc.async.trace')
-
class Base(persistent.Persistent):
_z_parent__ = parent = None
@@ -112,6 +112,9 @@
def __init__(self, period, buckets):
self._data = zope.bforest.periodic.LOBForest(period, count=buckets)
+ def clear(self):
+ self._data.clear()
+
@property
def period(self):
return self._data.period
Added: zc.async/branches/dev/src/zc/async/z3tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/z3tests.py (rev 0)
+++ zc.async/branches/dev/src/zc/async/z3tests.py 2008-04-02 03:11:14 UTC (rev 85059)
@@ -0,0 +1,26 @@
+import os
+import unittest
+from zope.testing import doctest, module
+
+import zc.async.tests
+
+def setUp(test):
+ zc.async.tests.modSetUp(test)
+ # make the uuid stable for these tests
+ f = open(os.path.join(
+ os.environ["INSTANCE_HOME"], 'etc', 'uuid.txt'), 'w')
+ f.writelines(('d10f43dc-ffdf-11dc-abd4-0017f2c49bdd',)) # ...random...
+ f.close()
+ zc.async.instanceuuid.UUID = zc.async.instanceuuid.getUUID()
+
+def test_suite():
+ return unittest.TestSuite((
+ doctest.DocFileSuite(
+ 'monitor.txt',
+ setUp=setUp, tearDown=zc.async.tests.modTearDown,
+ optionflags=doctest.INTERPRET_FOOTNOTES),
+ ))
+
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
More information about the Checkins
mailing list