[Checkins] SVN: zc.async/branches/dev/s convert to new,
smaller-pickle zc.twist.Failure;
save much shorter period of job and poll info in memory;
only write poll logs if first or change;
delineate trace logs into debug (poll), info (successful job
completion), and error (job failure);
include telnetlib example in README_3, which in fact exposed a
documentation bug, now fixed;
use newer zc.twist and now-with-less-bugs zope.bforest.
Gary Poster
gary at zope.com
Wed Apr 9 16:43:37 EDT 2008
Log message for revision 85201:
convert to new, smaller-pickle zc.twist.Failure; save much shorter period of job and poll info in memory; only write poll logs if first or change; delineate trace logs into debug (poll), info (successful job completion), and error (job failure); include telnetlib example in README_3, which in fact exposed a documentation bug, now fixed; use newer zc.twist and now-with-less-bugs zope.bforest.
Changed:
U zc.async/branches/dev/setup.py
U zc.async/branches/dev/src/zc/async/README.txt
U zc.async/branches/dev/src/zc/async/README_3.txt
U zc.async/branches/dev/src/zc/async/dispatcher.py
U zc.async/branches/dev/src/zc/async/job.py
U zc.async/branches/dev/src/zc/async/job.txt
U zc.async/branches/dev/src/zc/async/monitor.py
U zc.async/branches/dev/src/zc/async/utils.py
U zc.async/branches/dev/src/zc/async/z3tests.py
-=-
Modified: zc.async/branches/dev/setup.py
===================================================================
--- zc.async/branches/dev/setup.py 2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/setup.py 2008-04-09 20:43:37 UTC (rev 85201)
@@ -20,9 +20,11 @@
'uuid',
'zc.queue',
'zc.dict>=1.2.1',
- 'zc.twist>=1.1',
- 'Twisted>=8.0.1', # 8.0 was setuptools compatible
- 'zope.bforest>=1.1',
+ 'zc.twist>=1.2',
+ 'Twisted>=8.0.1', # 8.0 was setuptools compatible, 8.0.1 had bugfixes.
+ # note that Twisted builds with warnings, at least with py2.4. It
+ # seems to still build ok.
+ 'zope.bforest>=1.1.1',
'zope.component',
'zope.i18nmessageid',
'zope.interface',
Modified: zc.async/branches/dev/src/zc/async/README.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README.txt 2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/README.txt 2008-04-09 20:43:37 UTC (rev 85201)
@@ -313,9 +313,9 @@
completion, either to success or an exception.
- You can look at the result of the call (once COMPLETED). It might be
- the result you expect, or a twisted.python.failure.Failure, which is a
- way to safely communicate exceptions across connections and machines
- and processes.
+ the result you expect, or a zc.twist.Failure, which is a
+ subclass of twisted.python.failure.Failure, way to safely communicate
+ exceptions across connections and machines and processes.
-------
Results
@@ -389,7 +389,7 @@
>>> wait_for(job)
>>> t = transaction.begin()
>>> job.result
- <twisted.python.failure.Failure exceptions.NameError>
+ <zc.twist.Failure exceptions.NameError>
Failures can provide useful information such as tracebacks.
@@ -442,7 +442,7 @@
>>> transaction.commit()
>>> wait_for(job)
>>> job.result
- <twisted.python.failure.Failure exceptions.NameError>
+ <zc.twist.Failure exceptions.NameError>
>>> callback1.result
'I handled a name error'
>>> callback2.result
@@ -1064,7 +1064,7 @@
>>> transaction.commit()
>>> wait_for(job)
>>> job.result
- <twisted.python.failure.Failure zc.async.interfaces.AbortedError>
+ <zc.twist.Failure zc.async.interfaces.AbortedError>
>>> import sys
>>> job.result.printTraceback(sys.stdout) # doctest: +NORMALIZE_WHITESPACE
Traceback (most recent call last):
@@ -1135,12 +1135,19 @@
'shortest active': None,
'shortest failed': ('\x00\...', 'unnamed'),
'shortest successful': ('\x00...', 'unnamed'),
- 'started': 12,
- 'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
- 'statistics start': datetime.datetime(...),
- 'successful': 10,
+ 'started': 10,
+ 'statistics end': datetime.datetime(2006, 8, 10, 15, 46, 52, 211),
+ 'statistics start': datetime.datetime(2006, 8, 10, 15, 56, 52, 211),
+ 'successful': 8,
'unknown': 0}
+ Although, wait a second--the 'statistics end', the 'started', and the
+ 'successful' values have changed! Why?
+
+ To keep memory from rolling out of control, the dispatcher by default
+ only keeps 10 to 12.5 minutes worth of poll information in memory. For
+ the rest, keep logs and look at them (...and rotate them!).
+
The ``getActiveJobIds`` list shows the new task--which is completed, but
not as of the last poll, so it's still in the list.
@@ -1172,9 +1179,9 @@
'shortest active': None,
'shortest failed': ('\x00\...', 'unnamed'),
'shortest successful': ('\x00...', 'unnamed'),
- 'started': 24,
- 'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
+ 'started': 22,
+ 'statistics end': datetime.datetime(2006, 8, 10, 15, 46, 52, 211),
'statistics start': datetime.datetime(2006, 8, 10, 15, 57, 47, 211),
- 'successful': 22,
+ 'successful': 20,
'unknown': 0}
>>> reactor.stop()
Modified: zc.async/branches/dev/src/zc/async/README_3.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README_3.txt 2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/README_3.txt 2008-04-09 20:43:37 UTC (rev 85201)
@@ -99,7 +99,7 @@
zdaemon.conf::
<environment>
- ZC_ASYNC_UUID /Users/gary/opt/classifieds/parts/classifieds/uuid.txt
+ ZC_ASYNC_UUID /path/to/uuid.txt
</environment>
(Other tools, such as supervisor, also can work, of course; their spellings are
@@ -117,6 +117,7 @@
... >
... <include package="zope.component" file="meta.zcml" />
... <include package="zope.component" />
+ ... <include package="zc.z3monitor" />
... <include package="zc.async" file="multidb_dispatcher_policy.zcml" />
...
... <!-- this is usually handled in Zope applications by the
@@ -161,11 +162,29 @@
>>> wait_for_result(job)
42
-TODO look at files, maybe even use telnetlib to show that you can get over to
-the monitor port, for amusement.
+We can connect to the monitor server with telnet.
-Now we'll "shut down" with a CTRL-C, or SIGINT.
+ >>> import telnetlib
+ >>> tn = telnetlib.Telnet('127.0.0.1', monitor_port)
+ >>> tn.write('async status\n') # immediately disconnects
+ >>> print tn.read_all() # doctest: +ELLIPSIS
+ {
+ "poll interval": {
+ "seconds": ...
+ },
+ "status": "RUNNING",
+ "time since last poll": {
+ "seconds": ...
+ },
+ "uptime": {
+ "seconds": ...
+ },
+ "uuid": "..."
+ }
+ <BLANKLINE>
+Now we'll "shut down" with a CTRL-C, or SIGINT, and clean up.
+
>>> import signal
>>> if getattr(os, 'getpid', None) is not None: # UNIXEN, not Windows
... pid = os.getpid()
@@ -194,8 +213,6 @@
>>> bool(da.activated)
False
-Now we'll clean up.
-
>>> db.close()
>>> db.databases['async'].close()
>>> import shutil
@@ -260,7 +277,7 @@
>>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
>>> import time
- >>> def get_poll(count = None):
+ >>> def get_poll(count=None): # just a helper used later, not processing
... if count is None:
... count = len(dispatcher.polls)
... for i in range(30):
@@ -271,7 +288,7 @@
... assert False, 'no poll!'
...
- >>> def wait_for_result(job):
+ >>> def wait_for_result(job): # just a helper used later, not processing
... for i in range(30):
... t = transaction.begin()
... if job.status == zc.async.interfaces.COMPLETED:
Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py 2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py 2008-04-09 20:43:37 UTC (rev 85201)
@@ -228,10 +228,24 @@
# None at some point, to default to the installed Twisted reactor.
self.poll_interval = poll_interval
self.UUID = uuid
+ # we keep these small so that memory usage doesn't balloon too big.
+ # for polls, about 10 minutes at 5 seconds a poll with a fairly large
+ # poll size of maybe 300 bytes means 12 polls/minute, or 120 polls,
+ # * 300 == 36000, about 36K. Not too bad. Jobs can take much more
+ # memory depending on the result--a failure takes a lot of memory, for
+ # instance--and there's no real way to guess how many we would get in
+ # a given period of time. With a wild guess of an average of a K per
+ # job, and storage of 20 minutes, we would get 240K for 12 jobs a
+ # minute, or 1.2M for a job a second, and so on. That's much bigger,
+ # but we still have a long way to go before we have noticeable memory
+ # consumption on typical production machines.
+ # We keep jobs longer than polls because you may want to find out
+ # about active jobs in a given poll, and jobs will begin their
+ # timeout period when they are begun, so we give a bit of cushion.
self.polls = zc.async.utils.Periodic(
- period=datetime.timedelta(hours=2), buckets=3)
+ period=datetime.timedelta(minutes=10), buckets=5) # max of 12.5 min
self.jobs = zope.bforest.periodic.OOBForest(
- period=datetime.timedelta(hours=3), count=4)
+ period=datetime.timedelta(minutes=20), count=9) # max of 22.5 min
self._activated = set()
self.queues = {}
self.dead_pools = []
@@ -250,8 +264,7 @@
self.UUID, agent.name, agent._p_oid,
agent.queue.name,
agent.queue._p_oid, exc_info=True)
- return zc.twist.sanitize(
- twisted.python.failure.Failure())
+ return zc.twist.Failure()
res = self._commit(
'Error trying to commit getting a job for UUID %s from '
'agent %s (oid %s) in queue %s (oid %s)' % (
@@ -275,8 +288,7 @@
'Repeated transaction error trying to commit in '
'zc.async: %s',
debug_string, exc_info=True)
- return zc.twist.sanitize(
- twisted.python.failure.Failure())
+ return zc.twist.Failure()
retry += 1
except zc.twist.EXPLOSIVE_ERRORS:
transaction.abort()
@@ -286,8 +298,7 @@
zc.async.utils.log.error(
'Error trying to commit: %s',
debug_string, exc_info=True)
- return zc.twist.sanitize(
- twisted.python.failure.Failure())
+ return zc.twist.Failure()
else:
break
@@ -348,8 +359,7 @@
except zc.twist.EXPLOSIVE_ERRORS:
raise
except:
- agent_info['error'] = zc.twist.sanitize(
- twisted.python.failure.Failure())
+ agent_info['error'] = zc.twist.Failure()
transaction.abort()
continue
pool = pools.get(name)
@@ -426,11 +436,16 @@
self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
finally:
transaction.abort()
+ try:
+ last = self.polls.first()
+ except ValueError:
+ last = None
self.polls.add(poll_info)
for info in started_jobs:
info['poll id'] = poll_info.key
- zc.async.utils.tracelog.info(
- 'poll %s: %r', poll_info.key, poll_info)
+ if last is None or last != poll_info:
+ zc.async.utils.tracelog.debug(
+ 'poll %s: %r', poll_info.key, poll_info)
def directPoll(self):
if not self.activated:
Modified: zc.async/branches/dev/src/zc/async/job.py
===================================================================
--- zc.async/branches/dev/src/zc/async/job.py 2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/job.py 2008-04-09 20:43:37 UTC (rev 85201)
@@ -296,7 +296,7 @@
tm.abort()
ct += 1
if ct >= 5:
- res = self._complete(twisted.python.failure.Failure(), tm)
+ res = self._complete(zc.twist.Failure(), tm)
self.resumeCallbacks()
else:
continue
@@ -305,7 +305,7 @@
raise
except:
tm.abort()
- res = self._complete(twisted.python.failure.Failure(), tm)
+ res = self._complete(zc.twist.Failure(), tm)
self.resumeCallbacks()
else:
if self._status == zc.async.interfaces.CALLBACKS:
@@ -332,7 +332,7 @@
raise zc.async.interfaces.BadStatusError(
'can only call fail on a job with NEW, PENDING, ASSIGNED, or '
'ACTIVE status')
- self._complete(twisted.python.failure.Failure(e),
+ self._complete(zc.twist.Failure(e),
transaction.interfaces.ITransactionManager(self))
self.resumeCallbacks()
Modified: zc.async/branches/dev/src/zc/async/job.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/job.txt 2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/job.txt 2008-04-09 20:43:37 UTC (rev 85201)
@@ -104,7 +104,7 @@
>>> transaction.commit()
>>> res = j()
>>> j.result
- <twisted.python.failure.Failure exceptions.RuntimeError>
+ <zc.twist.Failure exceptions.RuntimeError>
These are standard twisted Failures, except that frames in the stored
traceback have been converted to reprs, so that we don't keep references
@@ -889,6 +889,8 @@
>>> res = j()
>>> root['demo'].counter # it was not rolled back automatically
2
+ >>> j.result
+ <zc.twist.Failure exceptions.RuntimeError>
>>> print j.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
Modified: zc.async/branches/dev/src/zc/async/monitor.py
===================================================================
--- zc.async/branches/dev/src/zc/async/monitor.py 2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/monitor.py 2008-04-09 20:43:37 UTC (rev 85201)
@@ -20,7 +20,7 @@
}
res = dict((k, v) for k, v in tmp.items() if v)
if not res:
- res['seconds'] = 0
+ res['seconds'] = 0.0
return res
# TODO the spelling of this conditional is to support our test setup
# shenanigans. originally was ``isinstance(obj, datetime.datetime)``.
Modified: zc.async/branches/dev/src/zc/async/utils.py
===================================================================
--- zc.async/branches/dev/src/zc/async/utils.py 2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/utils.py 2008-04-09 20:43:37 UTC (rev 85201)
@@ -216,11 +216,7 @@
def __nonzero__(self):
for b in self._data.buckets:
- try:
- iter(b).next()
- except StopIteration:
- pass
- else:
+ for ignore in b:
return True
return False
Modified: zc.async/branches/dev/src/zc/async/z3tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/z3tests.py 2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/z3tests.py 2008-04-09 20:43:37 UTC (rev 85201)
@@ -2,6 +2,9 @@
import unittest
from zope.testing import doctest, module
import zope.component.testing
+import zc.ngi.async # to quiet the thread complaints from the testing
+# infrastructure, because there is no API way to stop the z3monitor server or
+# the zc.ngi.async thread. :-(
import zc.async.tests
More information about the Checkins
mailing list