[Checkins] SVN: zc.async/trunk/src/zc/async/ - Switched dispatcher's in-memory storage of job and poll information to be per

Gary Poster gary at zope.com
Fri Jun 20 17:43:40 EDT 2008


Log message for revision 87606:
  - Switched dispatcher's in-memory storage of job and poll information to be per
    job or per poll, respectively, rather than per time period, so as to try and
    make memory usage more predictable (for instance, whether a dispatcher is
    whipping through lots of jobs quickly, or doing work more slowly).
  
  

Changed:
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/README.txt
  U   zc.async/trunk/src/zc/async/TODO.txt
  U   zc.async/trunk/src/zc/async/dispatcher.py
  U   zc.async/trunk/src/zc/async/dispatcher.txt
  U   zc.async/trunk/src/zc/async/utils.py

-=-
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-06-20 17:04:15 UTC (rev 87605)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-06-20 21:43:38 UTC (rev 87606)
@@ -50,6 +50,11 @@
 
 - Depends on new release of zc.twist (1.3)
 
+- Switched dispatcher's in-memory storage of job and poll information to be per
+  job or per poll, respectively, rather than per time period, so as to try and
+  make memory usage more predictable (for instance, whether a dispatcher is
+  whipping through lots of jobs quickly, or doing work more slowly).
+
 1.1.1 (2008-05-14)
 ==================
 

Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt	2008-06-20 17:04:15 UTC (rev 87605)
+++ zc.async/trunk/src/zc/async/README.txt	2008-06-20 21:43:38 UTC (rev 87606)
@@ -1163,19 +1163,17 @@
      'shortest active': None,
      'shortest failed': (..., 'unnamed'),
      'shortest successful': (..., 'unnamed'),
-     'started': 10,
-     'statistics end': datetime.datetime(2006, 8, 10, 15, 46, 52, 211),
+     'started': 12,
+     'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
      'statistics start': datetime.datetime(2006, 8, 10, 15, 56, 52, 211),
-     'successful': 8,
+     'successful': 10,
      'unknown': 0}
 
-    Although, wait a second--the 'statistics end', the 'started', and the
-    'successful' values have changed!  Why?
+    Note that these statistics eventually rotate out. By default, poll info
+    will eventually rotate out after about 30 minutes (400 polls), and job info
+    will only keep the most recent 200 stats in-memory. To look in history
+    beyond these limits, check your logs.
 
-    To keep memory from rolling out of control, the dispatcher by default
-    only keeps 10 to 12.5 minutes worth of poll information in memory.  For
-    the rest, keep logs and look at them (...and rotate them!).
-
     The ``getActiveJobIds`` list is empty now.
 
     >>> dispatcher.getActiveJobIds()
@@ -1212,9 +1210,9 @@
      'shortest active': None,
      'shortest failed': (..., 'unnamed'),
      'shortest successful': (..., 'unnamed'),
-     'started': 22,
-     'statistics end': datetime.datetime(2006, 8, 10, 15, 46, 52, 211),
+     'started': 24,
+     'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
      'statistics start': datetime.datetime(2006, 8, 10, 15, ...),
-     'successful': 20,
+     'successful': 22,
      'unknown': 0}
     >>> reactor.stop()

Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt	2008-06-20 17:04:15 UTC (rev 87605)
+++ zc.async/trunk/src/zc/async/TODO.txt	2008-06-20 21:43:38 UTC (rev 87606)
@@ -1,38 +1,41 @@
-For release
+Improvements
 
-- be even more pessimistic about memory for saved polls and job info in
-  dispatcher.
+- queues should be pluggable like agent with filter
 
-Bugs and improvements:
+More docs:
 
-- queues should be pluggable like agent with filter
 - show how to broadcast, maybe add conveniences
+
 - show how to use with collapsing jobs (hint to future self: use external queue
   to put in work, and have job(s) just pull what they can see from queue)
+
 - write tips and tricks
+
   * avoid long transactions if possible.  really avoid long transactions
     involving frequently written objects.  Discuss ramifications and
     strategies, such as doing big work in one job, then in callback schedule
     actually writing the data into the hotspot.
+
   * in zope.app.testing.functional tests, zc.async doesn't do well being
     started in a layer's setup because then it is associated with the
     wrapped layer DB, and the test is associated with the DemoStorage wrapper,
     so that the test can see what zc.async does, but zc.async can't see what
     the test does.  The current workaround is to start the dispatcher in the
     test or the test set up (but, again, *not* The layer set up).
+
   * In tests, don't check to see if poll is activated until after the first
     poll. Try ``zc.async.testing.get_poll(zc.async.dispatcher.get(), 0)``, for
     instance.
+
   * In tests, be aware that DemoStorage does not support mvcc and does not
     support conflict resolution, so you may experience ConflictError (write and
     particularly read) problems with it that you will not experience as much,
     or at all, with a storage that supports those features such as FileStorage.
     Notice that all of the tests in this package use FileStorage.
+
   * callbacks should be very, very quick, and very reliable.  If you want to do
     something that might take a while, put another job in the queue
 
-More docs:
-
 - custom retry policies, particularly for non-transactional tasks;
 
 - changing the default retry policy, per-process and per-agent; and

Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py	2008-06-20 17:04:15 UTC (rev 87605)
+++ zc.async/trunk/src/zc/async/dispatcher.py	2008-06-20 21:43:38 UTC (rev 87606)
@@ -304,7 +304,8 @@
     thread = None # this is just a placeholder that other code can use the
     # way that zc.async.subscribers.ThreadedDispatcherInstaller.__call__ does.
 
-    def __init__(self, db, reactor, poll_interval=5, uuid=None):
+    def __init__(self, db, reactor, poll_interval=5, uuid=None, jobs_size=200,
+                 polls_size=400):
         if uuid is None:
             uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
         if uuid in _dispatchers:
@@ -315,25 +316,26 @@
         # None at some point, to default to the installed Twisted reactor.
         self.poll_interval = poll_interval
         self.UUID = uuid
-        # we keep these small so that memory usage doesn't balloon too big.
-        # for polls, about 10 minutes at 5 seconds a poll with a fairly large
-        # poll size of maybe 300 bytes means 12 polls/minute, or 120 polls,
-        # * 300 == 36000, about 36K.  Not too bad.  Jobs can take much more
-        # memory depending on the result--a failure takes a lot of memory, for
-        # instance--and there's no real way to guess how many we would get in
-        # a given period of time.  With a wild guess of an average of a K per
-        # job, and storage of 20 minutes, we would get 240K for 12 jobs a
-        # minute, or 1.2M for a job a second, and so on.  That's much bigger,
-        # but we still have a long way to go before we have noticeable memory
-        # consumption on typical production machines.
-        # We keep jobs longer than polls because you may want to find out
-        # about active jobs in a given poll, and jobs will begin their
-        # timeout period when they are begun, so we give a bit of cushion.
-        self.polls = zc.async.utils.Periodic(
-            period=datetime.timedelta(minutes=10), buckets=5) # max of 12.5 min
+        # Let's talk about jobs_size and polls_size.
+        #
+        # Let's take a random guess that data for a job might be about 1K on
+        # average.  That would mean that the default value (keep 200 jobs)
+        # would mean about 200K.
+        #
+        # Let's randomly guess that a poll record averages 300 bytes on
+        # average.  That would mean that the default value (keep 400 polls)
+        # would mean (400*300 bytes == 120000 bytes == ) about 120K.  That
+        # would cover (400 polls * 5 seconds/poll * 1 min/60 seconds == )
+        # just over 33 minutes of polling at the default poll_interval.
+        #
+        # These memory usages should be not really noticeable on typical
+        # production machines. On the other hand, if this is causing you memory
+        # problems, reduce these values when you instantiate your dispatcher.
+        self.polls = zc.async.utils.RollingSet()
+        self.polls.size = polls_size
         self.polls.__parent__ = self
-        self.jobs = zope.bforest.periodic.OOBForest(
-            period=datetime.timedelta(minutes=20), count=9) # max of 22.5 min
+        self.jobs = zc.async.utils.RollingMapping()
+        self.jobs.size = jobs_size
         self.jobs.__parent__ = self
         self._activated = set()
         self.queues = {}
@@ -648,19 +650,7 @@
                 at = zc.async.utils.dt_to_long(before) + 16
             else:
                 at = before + 1
-        for bucket in tuple(self.polls._data.buckets): # freeze order
-            try:
-                if at is None:
-                    key = bucket.minKey()
-                else:
-                    key = bucket.minKey(at)
-                return bucket[key]
-            except (ValueError, KeyError):
-                # ValueError because minKey might not have a value
-                # KeyError because bucket might be cleared in another thread
-                # between minKey and __getitem__
-                pass
-        raise ValueError('no poll matches')
+        return self.polls.first(at)
 
     def iterPolls(self, at=None, before=None, since=None, count=None):
         # `polls` may be mutated during iteration so we don't iterate over it

Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt	2008-06-20 17:04:15 UTC (rev 87605)
+++ zc.async/trunk/src/zc/async/dispatcher.txt	2008-06-20 21:43:38 UTC (rev 87606)
@@ -306,6 +306,9 @@
 Notice that the result is a repr.  If this had been a failure, it would have
 been a (very) verbose traceback [#show_error]_.
 
+Only information about the most recent polls and jobs is kept in memory for
+analysis (400 polls and 200 jobs, by default) [#roll_off]_.
+
 As seen in other documents in zc.async, the job can also be a method of a
 persistent object, affecting a persistent object.
 
@@ -516,3 +519,118 @@
     exceptions.TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'
     *--- End of Failure #... ---
     <BLANKLINE>
+
+.. [#roll_off] Here's the defaults for the maximum most recent polls and jobs.
+
+    >>> dispatcher.polls.size
+    400
+    >>> dispatcher.jobs.size
+    200
+
+    You can set the size of the jobs and polls collections when you instantiate
+    the dispatcher.
+
+    >>> import uuid
+    >>> alt_dispatcher = zc.async.dispatcher.Dispatcher(
+    ...     db, reactor, poll_interval=0.5, uuid=uuid.uuid1(),
+    ...     jobs_size=50, polls_size=100)
+    >>> alt_dispatcher.polls.size
+    100
+    >>> alt_dispatcher.jobs.size
+    50
+
+    Here's an example of the roll-off, creating artificial job and poll
+    information.  First we show polls.
+
+    >>> parallel = []
+    >>> for i in range(10):
+    ...     info = zc.async.dispatcher.PollInfo(ct=i)
+    ...     alt_dispatcher.polls.add(info)
+    ...     parallel.insert(0, info)
+    ...
+    >>> len(alt_dispatcher.polls)
+    10
+    >>> list(alt_dispatcher.polls) == parallel
+    True
+    >>> for i in range(90):
+    ...     info = zc.async.dispatcher.PollInfo(ct=i)
+    ...     alt_dispatcher.polls.add(info)
+    ...     parallel.insert(0, info)
+    ...
+    >>> len(alt_dispatcher.polls)
+    100
+    >>> list(alt_dispatcher.polls) == parallel
+    True
+    >>> info = zc.async.dispatcher.PollInfo(ct=i)
+    >>> alt_dispatcher.polls.add(info)
+    >>> parallel.insert(0, info)
+    >>> list(alt_dispatcher.polls) == parallel
+    False
+    >>> len(alt_dispatcher.polls)
+    100
+    >>> len(parallel)
+    101
+    >>> list(alt_dispatcher.polls) == parallel[:100]
+    True
+    >>> for i in range(10):
+    ...     info = zc.async.dispatcher.PollInfo(ct=i)
+    ...     alt_dispatcher.polls.add(info)
+    ...     parallel.insert(0, info)
+    ...
+    >>> len(alt_dispatcher.polls)
+    100
+    >>> len(parallel)
+    111
+    >>> list(alt_dispatcher.polls) == parallel[:100]
+    True
+
+    Now we show jobs.
+
+    >>> parallel = []
+    >>> for i in range(10):
+    ...     info = {'ct': i}
+    ...     alt_dispatcher.jobs[i] = info
+    ...     parallel.insert(0, (i, info))
+    ...
+    >>> len(alt_dispatcher.jobs)
+    10
+    >>> dict(alt_dispatcher.jobs) == dict(parallel)
+    True
+    >>> for i in range(10, 50):
+    ...     info = {'ct': i}
+    ...     alt_dispatcher.jobs[i] = info
+    ...     parallel.insert(0, (i, info))
+    ...
+    >>> len(list(alt_dispatcher.jobs))
+    50
+    >>> len(alt_dispatcher.jobs)
+    50
+    >>> dict(alt_dispatcher.jobs) == dict(parallel)
+    True
+    >>> i = 50
+    >>> info = {'ct': i}
+    >>> alt_dispatcher.jobs[i] = info
+    >>> parallel.insert(0, (i, info))
+    >>> dict(alt_dispatcher.jobs) == dict(parallel)
+    False
+    >>> len(list(alt_dispatcher.jobs))
+    50
+    >>> len(alt_dispatcher.jobs)
+    50
+    >>> len(parallel)
+    51
+    >>> dict(alt_dispatcher.jobs) == dict(parallel[:50])
+    True
+    >>> for i in range(51, 60):
+    ...     info = {'ct': i}
+    ...     alt_dispatcher.jobs[i] = info
+    ...     parallel.insert(0, (i, info))
+    ...
+    >>> len(list(alt_dispatcher.jobs))
+    50
+    >>> len(alt_dispatcher.jobs)
+    50
+    >>> len(parallel)
+    60
+    >>> dict(alt_dispatcher.jobs) == dict(parallel[:50])
+    True

Modified: zc.async/trunk/src/zc/async/utils.py
===================================================================
--- zc.async/trunk/src/zc/async/utils.py	2008-06-20 17:04:15 UTC (rev 87605)
+++ zc.async/trunk/src/zc/async/utils.py	2008-06-20 21:43:38 UTC (rev 87606)
@@ -18,6 +18,7 @@
 
 import ZEO.Exceptions
 import ZODB.POSException
+import BTrees
 import rwproperty
 import persistent
 import zope.minmax
@@ -103,24 +104,17 @@
     return (datetime.datetime.max -
             datetime.timedelta(days, seconds, microseconds))
 
-class Periodic(persistent.Persistent):
-    # sorts on begin_after from newest to oldest
 
+class AbstractSet(persistent.Persistent):
+
     __parent__ = None
 
-    def __init__(self, period, buckets):
-        self._data = zope.bforest.periodic.LOBForest(period, count=buckets)
+    def __init__(self):
+        self._data = BTrees.family64.IO.BTree()
 
     def clear(self):
         self._data.clear()
 
-    @property
-    def period(self):
-        return self._data.period
-    @rwproperty.setproperty
-    def period(self, value):
-        self._data.period = value
-
     def add(self, item):
         key = zc.async.utils.dt_to_long(datetime.datetime.utcnow()) + 15
         while key in self._data:
@@ -130,99 +124,97 @@
         item.parent = self.__parent__
         item.key = key
 
-    def iter(self, start=None, stop=None):
-        sources = []
-        if start is not None:
-            start = zc.async.utils.dt_to_long(start)
-        if stop is not None:
-            stop = zc.async.utils.dt_to_long(stop)
-        for b in self._data.buckets:
-            i = iter(b.items(start, stop))
-            try:
-                n = i.next()
-            except StopIteration:
-                pass
-            else:
-                sources.append([n, i])
-        sources.sort()
-        length = len(sources)
-        while length > 1:
-            src = sources.pop(0)
-            yield src[0][1]
-            try:
-                src[0] = src[1].next()
-            except StopIteration:
-                length -= 1
-            else:
-                bisect.insort(sources, src)
-        if sources:
-            yield sources[0][0][1]
-            for k, v in sources[0][1]:
-                yield v
-
     def __iter__(self):
-        return self._data.itervalues() # this takes more memory but the pattern
-        # is typically faster than the custom iter above (for relatively
-        # complete iterations of relatively small sets).  The custom iter
-        # has the advantage of the start and stop code.
+        return self._data.itervalues()
 
+    def __len__(self):
+        return len(self._data)
+
+    def __nonzero__(self):
+        return bool(self._data)
+
     def first(self, start=None):
-        original = start
         if start is not None:
-            start = zc.async.utils.dt_to_long(start)
-            minKey = lambda bkt: bkt.minKey(start)
+            if isinstance(start, (int, long)):
+                args = (start,)
+            else:
+                args = (dt_to_long(start),)
         else:
-            minKey = lambda bkt: bkt.minKey()
-        i = iter(self._data.buckets)
-        bucket = i.next()
-        try:
-            key = minKey(bucket)
-        except ValueError:
-            key = None
-        for b in i:
-            try:
-                k = minKey(b)
-            except ValueError:
-                continue
-            if key is None or k < key:
-                bucket, key = b, k
-        if key is None:
-            raise ValueError(original)
-        return bucket[key]
+            args = ()
+        return self._data[self._data.minKey(*args)]
 
     def last(self, stop=None):
-        original = stop
         if stop is not None:
+            if isinstance(stop, (int, long)):
+                args = (stop,)
+            else:
+                args = (dt_to_long(stop),)
+        else:
+            args = ()
+        return self._data[self._data.maxKey(*args)]
+
+    def iter(self, start=None, stop=None):
+        if start is not None:
+            start = zc.async.utils.dt_to_long(start)
+        if stop is not None:
             stop = zc.async.utils.dt_to_long(stop)
-            maxKey = lambda bkt: bkt.maxKey(stop)
+        return self._data.itervalues(start, stop)
+
+
+class Periodic(AbstractSet):
+    # sorts on begin_after from newest to oldest
+
+    def __init__(self, period, buckets):
+        self._data = zope.bforest.periodic.LOBForest(period, count=buckets)
+
+    @property
+    def period(self):
+        return self._data.period
+    @rwproperty.setproperty
+    def period(self, value):
+        self._data.period = value
+
+
+class RollingSet(AbstractSet):
+
+    size = 100
+
+    def add(self, item):
+        super(RollingSet, self).add(item)
+        diff = len(self._data) - self.size
+        while diff > 0:
+            self._data.pop(self._data.maxKey())
+            diff -= 1
+
+
+class RollingMapping(zc.dict.OrderedDict):
+
+    size = 100
+
+    def __setitem__(self, key, value):
+        super(RollingMapping, self).__setitem__(key, value)
+        diff = len(self) - self.size
+        if diff > 0:
+            for key in self._order[:diff]:
+                self._data.pop(key)
+            del self._order[:diff]
+            self._len.change(-diff)
+
+    def maxKey(self, key=None):
+        if key is None:
+            args = ()
         else:
-            maxKey = lambda bkt: bkt.maxKey()
-        i = iter(self._data.buckets)
-        bucket = i.next()
-        try:
-            key = maxKey(bucket)
-        except ValueError:
-            key = None
-        for b in i:
-            try:
-                k = maxKey(b)
-            except ValueError:
-                continue
-            if key is None or k > key:
-                bucket, key = b, k
+            args = (key,)
+        return self._data.maxKey(*args)
+
+    def minKey(self, key=None):
         if key is None:
-            raise ValueError(original)
-        return bucket[key]
+            args = ()
+        else:
+            args = (key,)
+        return self._data.minKey(*args)
 
-    def __nonzero__(self):
-        for b in self._data.buckets:
-            for ignore in b:
-                return True
-        return False
 
-    def __len__(self):
-        return len(self._data)
-
 def never_fail(call, identifier, tm):
     # forever for TransactionErrors; forever, with backoff, for anything else
     trans_ct = 0



More information about the Checkins mailing list