[Checkins] SVN: zc.async/branches/dev/s convert to new, smaller-pickle zc.twist.Failure; save much shorter period of job and poll info in memory; only write poll logs if first or change; delineate trace logs into debug (poll), info (successful job completion), and error (job failure); include telnetlib example in README_3, which in fact exposed a documentation bug, now fixed; use newer zc.twist and now-with-less-bugs zope.bforest.

Gary Poster gary at zope.com
Wed Apr 9 16:43:37 EDT 2008


Log message for revision 85201:
  convert to new, smaller-pickle zc.twist.Failure; save much shorter period of job and poll info in memory; only write poll logs if first or change; delineate trace logs into debug (poll), info (successful job completion), and error (job failure); include telnetlib example in README_3, which in fact exposed a documentation bug, now fixed; use newer zc.twist and now-with-less-bugs zope.bforest.

Changed:
  U   zc.async/branches/dev/setup.py
  U   zc.async/branches/dev/src/zc/async/README.txt
  U   zc.async/branches/dev/src/zc/async/README_3.txt
  U   zc.async/branches/dev/src/zc/async/dispatcher.py
  U   zc.async/branches/dev/src/zc/async/job.py
  U   zc.async/branches/dev/src/zc/async/job.txt
  U   zc.async/branches/dev/src/zc/async/monitor.py
  U   zc.async/branches/dev/src/zc/async/utils.py
  U   zc.async/branches/dev/src/zc/async/z3tests.py

-=-
Modified: zc.async/branches/dev/setup.py
===================================================================
--- zc.async/branches/dev/setup.py	2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/setup.py	2008-04-09 20:43:37 UTC (rev 85201)
@@ -20,9 +20,11 @@
         'uuid',
         'zc.queue',
         'zc.dict>=1.2.1',
-        'zc.twist>=1.1',
-        'Twisted>=8.0.1', # 8.0 was setuptools compatible
-        'zope.bforest>=1.1',
+        'zc.twist>=1.2',
+        'Twisted>=8.0.1', # 8.0 was setuptools compatible, 8.0.1 had bugfixes.
+        # note that Twisted builds with warnings, at least with py2.4.  It
+        # seems to still build ok.
+        'zope.bforest>=1.1.1',
         'zope.component',
         'zope.i18nmessageid',
         'zope.interface',

Modified: zc.async/branches/dev/src/zc/async/README.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README.txt	2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/README.txt	2008-04-09 20:43:37 UTC (rev 85201)
@@ -313,9 +313,9 @@
   completion, either to success or an exception.
 
 - You can look at the result of the call (once COMPLETED).  It might be
-  the result you expect, or a twisted.python.failure.Failure, which is a
-  way to safely communicate exceptions across connections and machines
-  and processes.
+  the result you expect, or a zc.twist.Failure, which is a
+  subclass of twisted.python.failure.Failure, way to safely communicate
+  exceptions across connections and machines and processes.
 
 -------
 Results
@@ -389,7 +389,7 @@
     >>> wait_for(job)
     >>> t = transaction.begin()
     >>> job.result
-    <twisted.python.failure.Failure exceptions.NameError>
+    <zc.twist.Failure exceptions.NameError>
 
 Failures can provide useful information such as tracebacks.
 
@@ -442,7 +442,7 @@
     >>> transaction.commit()
     >>> wait_for(job)
     >>> job.result
-    <twisted.python.failure.Failure exceptions.NameError>
+    <zc.twist.Failure exceptions.NameError>
     >>> callback1.result
     'I handled a name error'
     >>> callback2.result
@@ -1064,7 +1064,7 @@
     >>> transaction.commit()
     >>> wait_for(job)
     >>> job.result
-    <twisted.python.failure.Failure zc.async.interfaces.AbortedError>
+    <zc.twist.Failure zc.async.interfaces.AbortedError>
     >>> import sys
     >>> job.result.printTraceback(sys.stdout) # doctest: +NORMALIZE_WHITESPACE
     Traceback (most recent call last):
@@ -1135,12 +1135,19 @@
      'shortest active': None,
      'shortest failed': ('\x00\...', 'unnamed'),
      'shortest successful': ('\x00...', 'unnamed'),
-     'started': 12,
-     'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
-     'statistics start': datetime.datetime(...),
-     'successful': 10,
+     'started': 10,
+     'statistics end': datetime.datetime(2006, 8, 10, 15, 46, 52, 211),
+     'statistics start': datetime.datetime(2006, 8, 10, 15, 56, 52, 211),
+     'successful': 8,
      'unknown': 0}
 
+    Although, wait a second--the 'statistics end', the 'started', and the
+    'successful' values have changed!  Why?
+    
+    To keep memory from rolling out of control, the dispatcher by default
+    only keeps 10 to 12.5 minutes worth of poll information in memory.  For
+    the rest, keep logs and look at them (...and rotate them!).
+
     The ``getActiveJobIds`` list shows the new task--which is completed, but
     not as of the last poll, so it's still in the list.
     
@@ -1172,9 +1179,9 @@
      'shortest active': None,
      'shortest failed': ('\x00\...', 'unnamed'),
      'shortest successful': ('\x00...', 'unnamed'),
-     'started': 24,
-     'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
+     'started': 22,
+     'statistics end': datetime.datetime(2006, 8, 10, 15, 46, 52, 211),
      'statistics start': datetime.datetime(2006, 8, 10, 15, 57, 47, 211),
-     'successful': 22,
+     'successful': 20,
      'unknown': 0}
     >>> reactor.stop()

Modified: zc.async/branches/dev/src/zc/async/README_3.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README_3.txt	2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/README_3.txt	2008-04-09 20:43:37 UTC (rev 85201)
@@ -99,7 +99,7 @@
 zdaemon.conf::
 
     <environment>
-      ZC_ASYNC_UUID /Users/gary/opt/classifieds/parts/classifieds/uuid.txt
+      ZC_ASYNC_UUID /path/to/uuid.txt
     </environment>
 
 (Other tools, such as supervisor, also can work, of course; their spellings are
@@ -117,6 +117,7 @@
     ...            >
     ... <include package="zope.component" file="meta.zcml" />
     ... <include package="zope.component" />
+    ... <include package="zc.z3monitor" />
     ... <include package="zc.async" file="multidb_dispatcher_policy.zcml" />
     ...
     ... <!-- this is usually handled in Zope applications by the
@@ -161,11 +162,29 @@
     >>> wait_for_result(job)
     42
 
-TODO look at files, maybe even use telnetlib to show that you can get over to
-the monitor port, for amusement.
+We can connect to the monitor server with telnet.
 
-Now we'll "shut down" with a CTRL-C, or SIGINT.
+    >>> import telnetlib
+    >>> tn = telnetlib.Telnet('127.0.0.1', monitor_port)
+    >>> tn.write('async status\n') # immediately disconnects
+    >>> print tn.read_all() # doctest: +ELLIPSIS
+    {
+        "poll interval": {
+            "seconds": ...
+        }, 
+        "status": "RUNNING", 
+        "time since last poll": {
+            "seconds": ...
+        }, 
+        "uptime": {
+            "seconds": ...
+        }, 
+        "uuid": "..."
+    }
+    <BLANKLINE>
 
+Now we'll "shut down" with a CTRL-C, or SIGINT, and clean up.
+
     >>> import signal
     >>> if getattr(os, 'getpid', None) is not None: # UNIXEN, not Windows
     ...     pid = os.getpid()
@@ -194,8 +213,6 @@
     >>> bool(da.activated)
     False
 
-Now we'll clean up.
-
     >>> db.close()
     >>> db.databases['async'].close()
     >>> import shutil
@@ -260,7 +277,7 @@
     >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
 
     >>> import time
-    >>> def get_poll(count = None):
+    >>> def get_poll(count=None): # just a helper used later, not processing
     ...     if count is None:
     ...         count = len(dispatcher.polls)
     ...     for i in range(30):
@@ -271,7 +288,7 @@
     ...         assert False, 'no poll!'
     ... 
 
-    >>> def wait_for_result(job):
+    >>> def wait_for_result(job): # just a helper used later, not processing
     ...     for i in range(30):
     ...         t = transaction.begin()
     ...         if job.status == zc.async.interfaces.COMPLETED:

Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py	2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py	2008-04-09 20:43:37 UTC (rev 85201)
@@ -228,10 +228,24 @@
         # None at some point, to default to the installed Twisted reactor.
         self.poll_interval = poll_interval
         self.UUID = uuid
+        # we keep these small so that memory usage doesn't balloon too big.
+        # for polls, about 10 minutes at 5 seconds a poll with a fairly large
+        # poll size of maybe 300 bytes means 12 polls/minute, or 120 polls,
+        # * 300 == 36000, about 36K.  Not too bad.  Jobs can take much more
+        # memory depending on the result--a failure takes a lot of memory, for
+        # instance--and there's no real way to guess how many we would get in
+        # a given period of time.  With a wild guess of an average of a K per
+        # job, and storage of 20 minutes, we would get 240K for 12 jobs a
+        # minute, or 1.2M for a job a second, and so on.  That's much bigger,
+        # but we still have a long way to go before we have noticeable memory
+        # consumption on typical production machines.
+        # We keep jobs longer than polls because you may want to find out
+        # about active jobs in a given poll, and jobs will begin their
+        # timeout period when they are begun, so we give a bit of cushion.
         self.polls = zc.async.utils.Periodic(
-            period=datetime.timedelta(hours=2), buckets=3)
+            period=datetime.timedelta(minutes=10), buckets=5) # max of 12.5 min
         self.jobs = zope.bforest.periodic.OOBForest(
-            period=datetime.timedelta(hours=3), count=4)
+            period=datetime.timedelta(minutes=20), count=9) # max of 22.5 min
         self._activated = set()
         self.queues = {}
         self.dead_pools = []
@@ -250,8 +264,7 @@
                 self.UUID, agent.name, agent._p_oid,
                 agent.queue.name,
                 agent.queue._p_oid, exc_info=True)
-            return zc.twist.sanitize(
-                twisted.python.failure.Failure())
+            return zc.twist.Failure()
         res = self._commit(
             'Error trying to commit getting a job for UUID %s from '
             'agent %s (oid %s) in queue %s (oid %s)' % (
@@ -275,8 +288,7 @@
                         'Repeated transaction error trying to commit in '
                         'zc.async: %s', 
                         debug_string, exc_info=True)
-                    return zc.twist.sanitize(
-                        twisted.python.failure.Failure())
+                    return zc.twist.Failure()
                 retry += 1
             except zc.twist.EXPLOSIVE_ERRORS:
                 transaction.abort()
@@ -286,8 +298,7 @@
                 zc.async.utils.log.error(
                     'Error trying to commit: %s', 
                     debug_string, exc_info=True)
-                return zc.twist.sanitize(
-                    twisted.python.failure.Failure())
+                return zc.twist.Failure()
             else:
                 break
 
@@ -348,8 +359,7 @@
                     except zc.twist.EXPLOSIVE_ERRORS:
                         raise
                     except:
-                        agent_info['error'] = zc.twist.sanitize(
-                            twisted.python.failure.Failure())
+                        agent_info['error'] = zc.twist.Failure()
                         transaction.abort()
                         continue
                     pool = pools.get(name)
@@ -426,11 +436,16 @@
                     self.db.setPoolSize(self.db.getPoolSize() + conn_delta)
         finally:
             transaction.abort()
+            try:
+                last = self.polls.first()
+            except ValueError:
+                last = None
             self.polls.add(poll_info)
             for info in started_jobs:
                 info['poll id'] = poll_info.key
-            zc.async.utils.tracelog.info(
-                'poll %s: %r', poll_info.key, poll_info)
+            if last is None or last != poll_info:
+                zc.async.utils.tracelog.debug(
+                    'poll %s: %r', poll_info.key, poll_info)
 
     def directPoll(self):
         if not self.activated:

Modified: zc.async/branches/dev/src/zc/async/job.py
===================================================================
--- zc.async/branches/dev/src/zc/async/job.py	2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/job.py	2008-04-09 20:43:37 UTC (rev 85201)
@@ -296,7 +296,7 @@
                 tm.abort()
                 ct += 1
                 if ct >= 5:
-                    res = self._complete(twisted.python.failure.Failure(), tm)
+                    res = self._complete(zc.twist.Failure(), tm)
                     self.resumeCallbacks()
                 else:
                     continue
@@ -305,7 +305,7 @@
                 raise
             except:
                 tm.abort()
-                res = self._complete(twisted.python.failure.Failure(), tm)
+                res = self._complete(zc.twist.Failure(), tm)
                 self.resumeCallbacks()
             else:
                 if self._status == zc.async.interfaces.CALLBACKS:
@@ -332,7 +332,7 @@
             raise zc.async.interfaces.BadStatusError(
                 'can only call fail on a job with NEW, PENDING, ASSIGNED, or '
                 'ACTIVE status')
-        self._complete(twisted.python.failure.Failure(e),
+        self._complete(zc.twist.Failure(e),
                        transaction.interfaces.ITransactionManager(self))
         self.resumeCallbacks()
 

Modified: zc.async/branches/dev/src/zc/async/job.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/job.txt	2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/job.txt	2008-04-09 20:43:37 UTC (rev 85201)
@@ -104,7 +104,7 @@
     >>> transaction.commit()
     >>> res = j()
     >>> j.result
-    <twisted.python.failure.Failure exceptions.RuntimeError>
+    <zc.twist.Failure exceptions.RuntimeError>
 
 These are standard twisted Failures, except that frames in the stored
 traceback have been converted to reprs, so that we don't keep references
@@ -889,6 +889,8 @@
     >>> res = j()
     >>> root['demo'].counter # it was not rolled back automatically
     2
+    >>> j.result
+    <zc.twist.Failure exceptions.RuntimeError>
     >>> print j.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
     Traceback (most recent call last):
     ...

Modified: zc.async/branches/dev/src/zc/async/monitor.py
===================================================================
--- zc.async/branches/dev/src/zc/async/monitor.py	2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/monitor.py	2008-04-09 20:43:37 UTC (rev 85201)
@@ -20,7 +20,7 @@
                   }
             res = dict((k, v) for k, v in tmp.items() if v)
             if not res:
-                res['seconds'] = 0
+                res['seconds'] = 0.0
             return res
         # TODO the spelling of this conditional is to support our test setup
         # shenanigans.  originally was ``isinstance(obj, datetime.datetime)``.

Modified: zc.async/branches/dev/src/zc/async/utils.py
===================================================================
--- zc.async/branches/dev/src/zc/async/utils.py	2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/utils.py	2008-04-09 20:43:37 UTC (rev 85201)
@@ -216,11 +216,7 @@
 
     def __nonzero__(self):
         for b in self._data.buckets:
-            try:
-                iter(b).next()
-            except StopIteration:
-                pass
-            else:
+            for ignore in b:
                 return True
         return False
 

Modified: zc.async/branches/dev/src/zc/async/z3tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/z3tests.py	2008-04-09 20:12:26 UTC (rev 85200)
+++ zc.async/branches/dev/src/zc/async/z3tests.py	2008-04-09 20:43:37 UTC (rev 85201)
@@ -2,6 +2,9 @@
 import unittest
 from zope.testing import doctest, module
 import zope.component.testing
+import zc.ngi.async # to quiet the thread complaints from the testing
+# infrastructure, because there is no API way to stop the z3monitor server or
+# the zc.ngi.async thread. :-(
 
 import zc.async.tests
 



More information about the Checkins mailing list