[Zope-Checkins] CVS: Zope/lib/python/Products/Transience - Transience.py:1.32.12.2.2.1

Chris McDonough chrism at plope.com
Sun Feb 29 03:43:19 EST 2004


Update of /cvs-repository/Zope/lib/python/Products/Transience
In directory cvs.zope.org:/tmp/cvs-serv15188

Modified Files:
      Tag: chrism-sessiongeddon
	Transience.py 
Log Message:
New Transience implementation.  This might even hold up under load.


=== Zope/lib/python/Products/Transience/Transience.py 1.32.12.2 => 1.32.12.2.2.1 ===
--- Zope/lib/python/Products/Transience/Transience.py:1.32.12.2	Wed Jan 21 14:58:58 2004
+++ Zope/lib/python/Products/Transience/Transience.py	Sun Feb 29 03:42:48 2004
@@ -11,48 +11,42 @@
 #
 ##############################################################################
 """
-Transient Object Container Class ('timeslice'-based design).
+Transient Object Container Class ('timeslice'-based design, no index).
 
 $Id$
 """
 
 __version__='$Revision$'[11:-2]
 
+import math
+import time
+import random
+import sys
+import os
+import thread
+from cgi import escape
+
 import Globals
 from Globals import HTMLFile
 from TransienceInterfaces import Transient, DictionaryLike, ItemWithId,\
      TTWDictionary, ImmutablyValuedMappingOfPickleableObjects,\
      StringKeyedHomogeneousItemContainer, TransientItemContainer
-from OFS.SimpleItem import SimpleItem
+
+from BTrees.Length import Length
+from BTrees.OOBTree import OOBTree
+from BTrees.IOBTree import IOBTree
+from ZODB.POSException import ConflictError
+
 from Persistence import Persistent
-from Acquisition import Implicit
+from OFS.SimpleItem import SimpleItem
 from AccessControl import ClassSecurityInfo, getSecurityManager
 from AccessControl.SecurityManagement import newSecurityManager, \
      setSecurityManager
 from AccessControl.User import nobody
-from BTrees.OOBTree import OOBTree, OOBucket, OOSet
-from BTrees.IOBTree import IOBTree
-from BTrees.Length import Length
 from zLOG import LOG, WARNING, BLATHER
-import os.path
-import os
-import math, sys, random
-import time
-from types import InstanceType
-from TransientObject import TransientObject
-import thread
-import ThreadLock
-import Queue
-from cgi import escape
-
-_marker = []
-
-DEBUG = os.environ.get('Z_TOC_DEBUG', '')
 
-class MaxTransientObjectsExceeded(Exception): pass
+from TransientObject import TransientObject
 
-MIN_SPARE_BUCKETS = 10 # minimum number of transient buckets to keep spare
-PERIOD = 20 # attempt housekeeping every PERIOD seconds
 ADD_CONTAINER_PERM = 'Add Transient Object Container'
 MGMT_SCREEN_PERM = 'View management screens'
 ACCESS_CONTENTS_PERM = 'Access contents information'
@@ -60,8 +54,18 @@
 ACCESS_TRANSIENTS_PERM = 'Access Transient Objects'
 MANAGE_CONTAINER_PERM = 'Manage Transient Object Container'
 
-constructTransientObjectContainerForm = HTMLFile(
-    'dtml/addTransientObjectContainer', globals())
+
+PERIOD = 20 # attempt housekeeping every PERIOD seconds
+SPARE_BUCKETS = 15 # number of buckets to keep spare
+STRICT = os.environ.get('Z_TOC_STRICT', '')
+DEBUG = int(os.environ.get('Z_TOC_DEBUG', 0))
+
+_marker = []
+
+def setStrict(on=''):
+    """ Turn on assertions (which may cause conflicts) """
+    global STRICT
+    STRICT = on
 
 def TLOG(*args):
     sargs = []
@@ -69,7 +73,12 @@
     sargs.append(str(time.time()))
     for arg in args:
         sargs.append(str(arg))
-    LOG('Transience', BLATHER, ' '.join(sargs))
+    msg = ' '.join(sargs)
+    print msg
+    LOG('Transience', BLATHER, msg)
+
+constructTransientObjectContainerForm = HTMLFile(
+    'dtml/addTransientObjectContainer', globals())
 
 def constructTransientObjectContainer(self, id, title='', timeout_mins=20,
     addNotification=None, delNotification=None, limit=0, REQUEST=None):
@@ -80,6 +89,8 @@
     if REQUEST is not None:
         return self.manage_main(self, REQUEST, update_menu=1)
 
+class MaxTransientObjectsExceeded(Exception): pass
+
 class TransientObjectContainer(SimpleItem):
     """ Object which contains items that are automatically flushed
     after a period of inactivity """
@@ -87,39 +98,6 @@
     meta_type = "Transient Object Container"
     icon = "misc_/Transience/datacontainer.gif"
 
-    # chrism 6/20/2002
-    # I was forced to make this a mostly "synchronized" class, using
-    # a single ThreadLock instance ("lock" below).   I realize this
-    # is paranoid and even a little sloppy. ;-)
-    #
-    # Rationale: in high-conflict situations without this lock, the
-    # index and the "data" (bucket) structure slowly get out of sync with
-    # one another.  I'm only sure about one thing when it comes to this:
-    # I don't completely understand why.  So, I'm not going to worry about
-    # it (indefinitely) as the locking solves it. "Slow and steady" is better
-    # than "fast and broken".
-    lock = ThreadLock.allocate_lock()
-
-    # notify_queue is a queue in which deindexed objects are placed
-    # for later processing by housekeeping, which calls the
-    # "delete notifier" at appropriate times.  As threads pass through
-    # the housekeeping stage, they pull any unnotified objects from this
-    # queue and call the delete notifier.  We use a queue here in order
-    # to not "double-notify" when two threads are doing housekeeping
-    # at the same time.  Note that there may be a case where a conflict
-    # error is raised and the results of a delete notifier are not
-    # committed, but that is better than calling the delete notifier
-    # *again* on the retry.
-    notify_queue = Queue.Queue()
-
-    # replentish queue is a size-one queue.   It is used as optimization
-    # to avoid conflicts. If you're running low on buckets, an entry is
-    # placed in the replentish queue.  The next thread that does housekeeping
-    # to notice the entry will extend the buckets.  Because queues are thread-
-    # safe, more than one thread will not attempt to replentish at the same
-    # time.
-    replentish_queue = Queue.Queue(1)
-
     __implements__ = (ItemWithId,
                       StringKeyedHomogeneousItemContainer,
                       TransientItemContainer
@@ -162,42 +140,373 @@
         self._reset()
         self._setTimeout(timeout_mins)
         self._setLimit(limit)
-        self._addCallback = None
-        self._delCallback = None
         self.setDelNotificationTarget(delNotification)
         self.setAddNotificationTarget(addNotification)
 
+    # helpers
+
+    def _setTimeout(self, timeout_mins):
+        if type(timeout_mins) is not type(1):
+            raise TypeError, (escape(`timeout_mins`), "Must be integer")
+        self._timeout_secs = t_secs = timeout_mins * 60
+        # timeout_slices == fewest number of timeslices that's >= t_secs
+        self._timeout_slices=int(math.ceil(float(t_secs)/PERIOD))
+
+    def _setLimit(self, limit):
+        if type(limit) is not type(1):
+            raise TypeError, (escape(`limit`), "Must be integer")
+        self._limit = limit
+
+    def _reset(self):
+        """ Reset ourselves to a sane state (deletes all content) """
+        # _data contains a mapping of f-of-time(int) (aka "slice") to
+        # "bucket".  Each bucket will contain a set of transient items.
+        # Transient items move automatically from bucket-to-bucket inside
+        # of the _data structure based on last access time (e.g.
+        # "get" calls), escaping destruction only if they move quickly
+        # enough.
+        # We make enough buckets initially to last us a while, and
+        # we subsequently extend _data with fresh buckets and remove old
+        # buckets as necessary during normal operations (see
+        # _housekeep()).
+        self._data = IOBTree()
+
+        # populate _data with some number of buckets, each of which
+        # is "current" for its timeslice key
+        new_slices = getTimeslices(getCurrentTimeslice(), SPARE_BUCKETS*2)
+        for i in new_slices:
+            self._data[i] = OOBTree()
+
+        # create an Increaser for max timeslice
+        self._max_timeslice = Increaser(max(new_slices))
+
+        # our "__len__" is the length of _index.
+        # we need to maintain the length of the index structure separately
+        # because getting the length of a BTree is very expensive.
+        try: self.__len__.set(0)
+        except AttributeError: self.__len__ = self.getLen = Length()
+
+    def _getCurrentSlices(self, now):
+        begin = now+PERIOD - (PERIOD * self._timeout_slices)
+        return getTimeslices(begin, self._timeout_slices)
+
+    def _move_item(self, k, current_ts, default=None):
+        if self._roll(current_ts, 'replentish'):
+            self._replentish(current_ts)
+
+        if self._roll(current_ts, 'gc'):
+            self._gc(current_ts)
+
+        STRICT and _assert(self._data.has_key(current_ts))
+        current = self._getCurrentSlices(current_ts)
+        found_ts = None
+
+        for ts in current:
+            bucket = self._data.get(ts)
+            if hasattr(bucket, 'has_key') and bucket.has_key(k):
+                found_ts = ts
+                break
+
+        if found_ts is None:
+            return default
+
+        bucket = self._data[found_ts]
+        item = bucket[k]
+
+        if current_ts != found_ts:
+            del bucket[k]
+            self._data[current_ts][k] = item
+
+        if hasattr(item, 'setLastAccessed'):
+            item.setLastAccessed()
+        return item
+
+    def _all(self):
+        
+        current_ts = getCurrentTimeslice()
+
+        if self._roll(current_ts, 'replentish'):
+            self._replentish(current_ts)
+
+        if self._roll(current_ts, 'gc'):
+            self._gc(current_ts)
+
+        STRICT and _assert(self._data.has_key(current_ts))
+        current = self._getCurrentSlices(current_ts)
+
+        current.reverse() # overwrite older with newer
+
+        d = {}
+        for ts in current:
+            bucket = self._data.get(ts)
+            if bucket is None:
+                continue
+            for k,v in bucket.items():
+                d[k] = self._wrap(v)
+
+        return d
+
+    def keys(self):
+        return self._all().keys()
+
+    def rawkeys(self, current_ts):
+        # for debugging
+        current = self._getCurrentSlices(current_ts)
+
+        current.reverse() # overwrite older with newer
+
+        d = {}
+        for ts in current:
+            bucket = self._data.get(ts, None)
+            if bucket is None:
+                continue
+            for k,v in bucket.items():
+                d[k] = self._wrap(v)
+
+        return d
+
+    def items(self):
+        return self._all().items()
+
+    def values(self):
+        return self._all().values()
+
+    def _wrap(self, item):
+        if hasattr(item, '__of__'):
+            item = item.__of__(self)
+        return item
+
+    def __getitem__(self, k):
+        current_ts = getCurrentTimeslice()
+        item = self._move_item(k, current_ts, _marker)
+        STRICT and _assert(self._data.has_key(current_ts))
+
+        if item is _marker:
+            raise KeyError, k
+
+        return self._wrap(item)
+
+    def __setitem__(self, k, v):
+        current_ts = getCurrentTimeslice()
+        item = self._move_item(k, current_ts, _marker)
+        STRICT and _assert(self._data.has_key(current_ts))
+        if item is _marker:
+            # the key didnt already exist, this is a new item
+            if self._limit and len(self) >= self._limit:
+                LOG('Transience', WARNING,
+                    ('Transient object container %s max subobjects '
+                     'reached' % self.getId())
+                    )
+                raise MaxTransientObjectsExceeded, (
+                 "%s exceeds maximum number of subobjects %s" %
+                 (len(self), self._limit))
+            self.__len__.change(1)
+        current_bucket = self._data[current_ts]
+        current_bucket[k] = v
+        self.notifyAdd(v)
+        # change the TO's last accessed time
+        if hasattr(v, 'setLastAccessed'):
+            v.setLastAccessed()
+
+    def __delitem__(self, k):
+        current_ts = getCurrentTimeslice()
+        item = self._move_item(k, current_ts)
+        STRICT and _assert(self._data.has_key(current_ts))
+        del self._data[current_ts][k]
+        self.__len__.change(-1)
+        return current_ts, item
+
+    security.declareProtected(ACCESS_TRANSIENTS_PERM, 'get')
+    def get(self, k, default=None):
+        current_ts = getCurrentTimeslice()
+        item = self._move_item(k, current_ts, _marker)
+        STRICT and _assert(self._data.has_key(current_ts))
+        if item is _marker:
+            return default
+        return self._wrap(item)
+
+    security.declareProtected(ACCESS_TRANSIENTS_PERM, 'has_key')
+    def has_key(self, k):
+        current_ts = getCurrentTimeslice()
+        item = self._move_item(k, current_ts, _marker)
+        STRICT and _assert(self._data.has_key(current_ts))
+        if item is not _marker:
+            return True
+        return False
+
+    def _roll(self, now, reason):
+        """
+        Roll the dice to see if we're the lucky thread that does
+        bucket replentishment or gc.  This method is guaranteed to return
+        true at some point as the difference between high and low naturally
+        diminishes to zero.
+
+        The reason we do the 'random' dance in the last part of this
+        is to minimize the chance that two threads will attempt to
+        do housekeeping at the same time (causing conflicts).
+        """
+        low = now/PERIOD
+        high = self._max_timeslice()/PERIOD
+        if high <= low:
+            # we really need to win this roll because we have no
+            # spare buckets (and no valid values to provide to randrange), so
+            # we rig the toss.
+            DEBUG and TLOG('_roll: %s rigged toss' % reason)
+            return True
+        else:
+            # we're not in an emergency bucket shortage, so we can take
+            # our chances during the roll.  It's highly unlikely that two
+            # threads will win the roll simultaneously, so we avoid a certain
+            # class of conflicts here.
+            if random.randrange(low, high) == low: # WINNAH!
+                DEBUG and TLOG("_roll: %s roll winner" % reason)
+                return True
+        DEBUG and TLOG("_roll: %s roll loser" % reason)
+        return False
+
+    def _replentish(self, now):
+        # available_spares == the number of "spare" buckets that exist in
+        # "_data"
+        max_ts = self._max_timeslice()
+        available_spares = (max_ts-now) / PERIOD
+        DEBUG and TLOG('_replentish: now = %s' % now)
+        DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
+        DEBUG and TLOG('_replentish: available_spares = %s'
+                       % available_spares)
+
+        if available_spares < SPARE_BUCKETS:
+            if max_ts < now:
+                replentish_start = now
+                replentish_end = now + (PERIOD * SPARE_BUCKETS)
+
+            else:
+                replentish_start = max_ts + PERIOD
+                replentish_end = max_ts + (PERIOD * SPARE_BUCKETS)
+
+            DEBUG and TLOG('_replentish: replentish_start = %s' %
+                           replentish_start)
+            DEBUG and TLOG('_replentish: replentish_end = %s'
+                           % replentish_end)
+            # n is the number of buckets to create
+            n = (replentish_end - replentish_start) / PERIOD
+            new_buckets = getTimeslices(replentish_start, n)
+            new_buckets.reverse()
+            STRICT and _assert(new_buckets)
+            DEBUG and TLOG('_replentish: adding %s new buckets' % n)
+            DEBUG and TLOG('_replentish: buckets to add = %s'
+                           % new_buckets)
+            for k in new_buckets:
+                STRICT and _assert(not self._data.has_key(k))
+                try:
+                    self._data[k] = OOBTree()
+                except ConflictError:
+                    DEBUG and TLOG('_replentish: conflict when adding %s' % k)
+                    time.sleep(random.choice([0.1, 0.2, 0.3])) # add entropy
+                    raise
+            self._max_timeslice.set(max(new_buckets))
+
+    def _gc(self, now=None):
+        if now is None:
+            now = getCurrentTimeslice() # for unit tests
+        max_ts = now  - (PERIOD * (self._timeout_slices + 1))
+        keys = self._data.keys(None, max_ts)
+
+        to_notify = []
+
+        for key in list(self._data.keys(None, max_ts)):
+            assert(key <= max_ts)
+            STRICT and _assert(self._data.has_key(key))
+            for v in self._data[key].values():
+                to_notify.append(v)
+            del self._data[key]
+
+        for v in to_notify:
+            self.notifyDel(v)
+
+    def notifyAdd(self, item):
+        DEBUG and TLOG('notifyAdd with %s' % item)
+        callback = self._getCallback(self._addCallback)
+        if callback is None:
+            return
+        self._notify(item, callback, 'notifyAdd')
+
+    def notifyDel(self, item):
+        DEBUG and TLOG('notifyDel with %s' % item)
+        callback = self._getCallback(self._delCallback)
+        if callback is None:
+            return
+        self._notify(item, callback, 'notifyDel' )
+
+    def _getCallback(self, callback):
+        if type(callback) is type(''):
+            try:
+                method = self.unrestrictedTraverse(callback)
+            except (KeyError, AttributeError):
+                path = self.getPhysicalPath()
+                err = 'No such method %s in %s %s'
+                LOG('Transience',
+                    WARNING,
+                    err % (callback, '/'.join(path), name),
+                    error=sys.exc_info()
+                    )
+                return
+        else:
+            method = callback
+        return method
+
+    def _notify(self, item, callback, name):
+        if callable(callback):
+            sm = getSecurityManager()
+            try:
+                user = sm.getUser()
+                try:
+                    newSecurityManager(None, nobody)
+                    callback(item, self)
+                except:
+                    # dont raise, just log
+                    path = self.getPhysicalPath()
+                    LOG('Transience',
+                        WARNING,
+                        '%s failed when calling %s in %s' % (name,callback,
+                                                        '/'.join(path)),
+                        error=sys.exc_info()
+                        )
+            finally:
+                setSecurityManager(sm)
+        else:
+            err = '%s in %s attempted to call non-callable %s'
+            path = self.getPhysicalPath()
+            LOG('Transience',
+                WARNING,
+                err % (name, '/'.join(path), callback),
+                error=sys.exc_info()
+                )
+
+    def getId(self):
+        return self.id
+
     security.declareProtected(CREATE_TRANSIENTS_PERM, 'new_or_existing')
     def new_or_existing(self, key):
-        self.lock.acquire()
-        try:
-            DEBUG and TLOG('new_or_existing called with %s' % key)
-            notfound = []
-            item  = self.get(key, notfound)
-            if item is notfound:
-                # intentionally dont call "new" here in order to avoid another
-                # call to "get"
-                item = TransientObject(key)
-                self[key] = item
-                self.notifyAdd(item)
-            return item.__of__(self)
-        finally:
-            self.lock.release()
+        DEBUG and TLOG('new_or_existing called with %s' % key)
+        item = self.get(key, _marker)
+        if item is _marker:
+            item = TransientObject(key)
+            self[key] = item
+            item = item.__of__(self)
+        return item
 
     security.declareProtected(CREATE_TRANSIENTS_PERM, 'new')
     def new(self, key):
-        self.lock.acquire()
-        try:
-            if type(key) is not type(''):
-                raise TypeError, (key, "key is not a string type")
-            if self.has_key(key):
-                raise KeyError, "cannot duplicate key %s" % key
-            item = TransientObject(key)
-            self[key] = item
-            self.notifyAdd(item)
-            return item.__of__(self)
-        finally:
-            self.lock.release()
+        DEBUG and TLOG('new called with %s' % key)
+        if type(key) is not type(''):
+            raise TypeError, (key, "key is not a string type")
+        if self.has_key(key):
+            raise KeyError, "cannot duplicate key %s" % key
+        item = TransientObject(key)
+        self[key] = item
+        return item.__of__(self)
+
+    # TransientItemContainer methods
 
     security.declareProtected(MANAGE_CONTAINER_PERM, 'setTimeoutMinutes')
     def setTimeoutMinutes(self, timeout_mins):
@@ -206,7 +515,6 @@
             self._setTimeout(timeout_mins)
             self._reset()
 
-    security.declareProtected(MGMT_SCREEN_PERM, 'getTimeoutMinutes')
     def getTimeoutMinutes(self):
         """ """
         return self._timeout_secs / 60
@@ -228,8 +536,6 @@
 
     security.declareProtected(MANAGE_CONTAINER_PERM,'setAddNotificationTarget')
     def setAddNotificationTarget(self, f):
-        # We should assert that the callback function 'f' implements
-        # the TransientNotification interface
         self._addCallback = f
 
     security.declareProtected(MGMT_SCREEN_PERM, 'getDelNotificationTarget')
@@ -238,70 +544,13 @@
 
     security.declareProtected(MANAGE_CONTAINER_PERM,'setDelNotificationTarget')
     def setDelNotificationTarget(self, f):
-        # We should assert that the callback function 'f' implements
-        # the TransientNotification interface
         self._delCallback = f
 
-    def notifyAdd(self, item):
-        if self._addCallback:
-            self._notify(item, 'add')
-
-    def notifyDestruct(self, item):
-        if self._delCallback:
-            self._notify(item, 'destruct')
-
-    def _notify(self, items, kind):
-        if not type(items) in [type([]), type(())]:
-            items = [items]
-
-        if kind =='add':
-            name = 'notifyAdd'
-            callback = self._addCallback
-        else:
-            name = 'notifyDestruct'
-            callback = self._delCallback
-        if type(callback) is type(''):
-            try:
-                method = self.unrestrictedTraverse(callback)
-            except (KeyError, AttributeError):
-                path = self.getPhysicalPath()
-                err = 'No such method %s in %s %s'
-                LOG('Transience',
-                    WARNING,
-                    err % (callback, '/'.join(path), name),
-                    error=sys.exc_info()
-                    )
-                return
-        else:
-            method = callback
-
-        for item in items:
-            if callable(method):
-                sm = getSecurityManager()
-                try:
-                    user = sm.getUser()
-                    try:
-                        newSecurityManager(None, nobody)
-                        method(item, self)
-                    except:
-                        # dont raise, just log
-                        path = self.getPhysicalPath()
-                        LOG('Transience',
-                            WARNING,
-                            '%s failed when calling %s in %s' % (name,callback,
-                                                            '/'.join(path)),
-                            error=sys.exc_info()
-                            )
-                finally:
-                    setSecurityManager(sm)
-            else:
-                err = '%s in %s attempted to call non-callable %s'
-                path = self.getPhysicalPath()
-                LOG('Transience',
-                    WARNING,
-                    err % (name, '/'.join(path), callback),
-                    error=sys.exc_info()
-                    )
+    security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
+    def nudge(self):
+        """ Used by mgmt interface to maybe do housekeeping each time
+        a screen is shown """
+        pass # XXX needed?
 
     security.declareProtected(MANAGE_CONTAINER_PERM,
         'manage_changeTransientObjectContainer')
@@ -325,578 +574,32 @@
                 self, REQUEST, manage_tabs_message='Changes saved.'
                 )
 
-    def _setTimeout(self, timeout_mins):
-        if type(timeout_mins) is not type(1):
-            raise TypeError, (escape(`timeout_mins`), "Must be integer")
-        self._timeout_secs = t_secs = timeout_mins * 60
-        # timeout_slices == fewest number of timeslices that's >= t_secs
-        self._timeout_slices=int(math.ceil(float(t_secs)/self._period))
-
-    def _setLimit(self, limit):
-        if type(limit) is not type(1):
-            raise TypeError, (escape(`limit`), "Must be integer")
-        self._limit = limit
-
-    security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
-    def nudge(self):
-        """ Used by mgmt interface to maybe turn the ring each time
-        a screen is shown """
-        self._getCurrentBucket()
-
-    def _getCurrentTimeslice(self):
-        """
-        Return an integer representing the 'current' timeslice.
-        The current timeslice is guaranteed to be the same integer
-        within a 'slice' of time based on a divisor of 'period'.
-        'period' is the number of seconds in a slice.
-        """
-        period = self._period
-        now = time.time()
-        low = int(math.floor(now)) - period + 1
-        high = int(math.ceil(now)) + 1
-        for x in range(low, high):
-            if x % period == 0:
-                return x
-
-    def _getTimeslices(self, begin, n):
-        """ Get a list of future timeslice integers of 'n' size """
-        l = []
-        for x in range(n):
-            l.append(begin + (x * self._period))
-        return l
-
-    def _getIndex(self):
-        """ returns the index, a mapping of TOC key to bucket """
-        self.lock.acquire()
-        try:
-            if self._data is None:
-                # do in-place upgrade of old instances
-                self._upgrade()
-            return self._index
-        finally:
-            self.lock.release()
-
-    def _upgrade(self):
-        """ upgrade older ring-based (2.5.X) TOC instances """
-        self.lock.acquire()
-        try:
-            self._reset()
-            timeout_mins = self._timeout_secs / 60
-            self._setTimeout(timeout_mins)
-            # iterate over all the buckets in the ring
-            for bucket, dump_after in self._ring._data:
-                # get all TOs in the ring and call our __setitem__
-                for k, v in bucket.items():
-                    self[k] = v
-            # we probably should delete the old "_ring" attribute here,
-            # but leave it around in case folks switch back to 2.5.X
-        finally:
-            self.lock.release()
-
-    def _reset(self):
-        """ Reset ourselves to a sane state (deletes all content) """
-        self.lock.acquire()
-        try:
-            # set the period (the timeslice length)
-            self._period = PERIOD
-
-            # set the number of minimum spare buckets
-            self._min_spare_buckets = MIN_SPARE_BUCKETS
-
-            # _data contains a mapping of f-of-time(int) (aka "slice") to
-            # "bucket".  Each bucket will contain a set of transient items.
-            # Transient items move automatically from bucket-to-bucket inside
-            # of the _data structure based on last access time (e.g.
-            # "get" calls), escaping destruction only if they move quickly
-            # enough.
-            # We make enough buckets initially to last us a while, and
-            # we subsequently extend _data with fresh buckets and remove old
-            # buckets as necessary during normal operations (see
-            # _housekeep()).
-            self._data = IOBTree()
-
-            # populate _data with some number of buckets, each of which
-            # is "current" for its timeslice key
-            for i in self._getTimeslices(self._getCurrentTimeslice(),
-                                         self._min_spare_buckets*2):
-                self._data[i] = OOBTree()
-
-            # _index is a mapping of transient item key -> slice, letting
-            # us quickly figure out which bucket in the _data mapping
-            # contains the transient object related to the key
-            self._index = OOBTree()
-
-            # our "__len__" is the length of _index.
-            # we need to maintain the length of the index structure separately
-            # because getting the length of a BTree is very expensive.
-            try: self.__len__.set(0)
-            except AttributeError: self.__len__ = self.getLen = Length()
-
-            # set up last_timeslice and deindex_next integer pointers
-            # we set them to the current timeslice as it's the only sane
-            # thing to do
-            self._last_timeslice=Increaser(self._getCurrentTimeslice())
-            self._deindex_next=Increaser(self._getCurrentTimeslice())
-        finally:
-            self.lock.release()
-
-    def _getCurrentBucket(self):
-        """
-        Do housekeeping if necessary, then return the 'current' bucket.
-        """
-        self.lock.acquire()
-        try:
-            # do in-place upgrade of old "ring-based" instances if
-            # we've just upgraded from Zope 2.5.X
-            if self._data is None:
-                self._upgrade()
-
-            # data is the mapping from timeslice to bucket
-            data = self._data
-
-            # period == number of seconds in a slice
-            period = self._period
-            
-            # pnow == the current timeslice
-            pnow = self._getCurrentTimeslice()
-
-            # pprev = the true previous timeslice in relation to pnow
-            pprev = pnow - period
-
-            # plast == the last timeslice under which we did housekeeping
-            plast = self._last_timeslice()
-
-            if not data.has_key(pnow):
-                # we were asleep a little too long, we don't even have a
-                # current bucket; we create one for ourselves.
-                # XXX - currently this ignores going back in time.
-                DEBUG and TLOG('_getCurrentBucket: creating current bucket!')
-                data[pnow] = OOBTree()
-
-            if pnow <= plast:
-                # If we went "back in time" or if the timeslice hasn't
-                # changed, dont try to do housekeeping.
-                # Instead, just return the current bucket.
-                return pnow
-
-            # the current timeslice has changed since the last time we did
-            # housekeeping, so we're going to see if we need to finalize
-            # anything.
-            DEBUG and TLOG('_getCurrentBucket: new timeslice (pnow) %s' % pnow)
-
-            # pmax == the last timeslice integer kept by _data as a key.
-            pmax = data.maxKey()
-
-            # t_slices == this TOC's timeout expressed in slices
-            # (fewest number of timeslices that's >= t_secs)
-            t_slices = self._timeout_slices
-
-            # deindex_next == the timeslice of the bucket we need to start
-            # deindexing from
-            deindex_next = self._deindex_next()
-
-            # The ordered set implied by data.keys(deindex_next, pprev) is
-            # a set of all timeslices that may have entries in the index which
-            # are known about by _data, starting from "deindex_next" up to
-            # but not including the current timeslice.  We iterate over
-            # these keys, deindexing buckets as necessary when they're older
-            # than the timeout.
-            # XXX - fixme!  range search doesn't always work (btrees bug)
-            for k in list(data.keys(deindex_next, pprev)):
-                if k < deindex_next:
-                    DEBUG and TLOG(
-                        'broken range search: key %s < min %s'
-                        % (k, deindex_next)
-                        )
-                    continue
-                if k > pprev:
-                    DEBUG and TLOG(
-                        'broken range search: key %s > max %s'
-                        % (k, pprev)
-                        )
-                    continue
-
-                # pthen == the number of seconds elapsed since the timeslice
-                # implied by k
-                pthen = pnow - k
-
-                # slices_since == the number of slices elapsed since the
-                # timeslice implied by k
-                slices_since = pthen / self._period
-
-                # if the number of slices since 'k' is less than the number of
-                # slices that make up the timeout, break out of this loop.
-                # (remember, this is an ordered set, and following keys are
-                # bound to be higher, meaning subsequent tests will also fail,
-                # so we don't even bother checking them)
-                if slices_since < t_slices:
-                    DEBUG and TLOG(
-                        '_getCurrentBucket: slices_since (%s)<t_slices (%s)' %
-                        (slices_since, t_slices))
-                    break
-
-                # if the bucket has keys, deindex them and add them to the
-                # notify queue (destruction notification happens during
-                # garbage collection)
-                bucket = data.get(k, _marker)
-                if bucket is _marker:
-                    DEBUG and TLOG(
-                        'data IOBTree lied about keys: %s doesnt exist' % k
-                        )
-                    continue
-                
-                keys = list(bucket.keys())
-                for key in keys:
-                    ob = bucket.get(key, _marker)
-                    if ob is _marker:
-                        DEBUG and TLOG(
-                            'bucket OOBTree lied about keys: %s doesnt exist' %
-                            key
-                            )
-                        continue
-                    self.notify_queue.put((key, ob))
-                DEBUG and TLOG(
-                    '_getCurrentBucket: deindexing keys %s' % keys
-                    )
-                keys and self._deindex(keys)
-                # set the "last deindexed" pointer to k + period
-                deindex_next = k+period
-                self._deindex_next.set(deindex_next)
-
-            # housekeep_elected indicates that this thread was elected to do
-            # housekeeping.  We set it off initially and only set it true if
-            # we "win the roll". The "roll" is necessary to avoid a conflict
-            # scenario where more than one thread tries to do housekeeping at
-            # the same time.
-            housekeep_elected = 0
-
-            # We ask this thread to "roll the dice." If it wins, it gets
-            # elected to do housekeeping
-            housekeep_elected = self._roll(pnow, pmax)
-            housekeep_elected and DEBUG and TLOG('housekeep elected')
-
-            # if we were elected to do housekeeping, do it now.
-            if housekeep_elected:
-
-                # available_spares == the number of "spare" ("clean", "future")
-                # buckets that exist in "_data"
-                available_spares = (pmax-pnow) / period
-                DEBUG and TLOG(
-                    '_getCurrentBucket: available_spares %s' % available_spares
-                    )
-
-                # delete_end == the last bucket we want to destroy
-                delete_end = deindex_next - period
-
-                # min_spares == minimum number of spare buckets acceptable
-                # by this TOC
-                min_spares = self._min_spare_buckets
-
-                if available_spares < min_spares:
-                    DEBUG and TLOG(
-                        '_getCurrentBucket: available_spares < min_spares'
-                        )
-                    # the first bucket we want to begin creating
-                    replentish_start = pmax + period
-                    try:
-                        self.replentish_queue.put_nowait(replentish_start)
-                    except Queue.Full:
-                        DEBUG and TLOG(
-                            '_getCurrentBucket: replentish queue full'
-                            )
-                self._housekeep(delete_end)
-
-            # finally, bump the last_timeslice housekeeping counter and return
-            # the current bucket
-            self._last_timeslice.set(pnow)
-            return pnow
-        finally:
-            self.lock.release()
-
-    def _roll(self, pnow, pmax):
-        """
-        Roll the dice to see if we're the lucky thread that does
-        housekeeping.  This method is guaranteed to return true at
-        some point as the difference between pnow and pmax naturally
-        diminishes to zero.
-
-        The reason we do the 'random' dance in the last part of this
-        is to minimize the chance that two threads will attempt to
-        do housekeeping at the same time (causing conflicts and double-
-        notifications).
-        """
-        period = self._period
-        low = pnow/period
-        high = pmax/period
-        if high <= low:
-            # we really need to win this roll because we have no
-            # spare buckets (and no valid values to provide to randrange), so
-            # we rig the toss.
-            DEBUG and TLOG("_roll: rigged toss")
-            return 1
-        else:
-            # we're not in an emergency bucket shortage, so we can take
-            # our chances during the roll.  It's highly unlikely that two
-            # threads will win the roll simultaneously, so we avoid a certain
-            # class of conflicts here.
-            if random.randrange(low, high) == low: # WINNAH!
-                DEBUG and TLOG("_roll: roll winner")
-                return 1
-        DEBUG and TLOG("_roll: roll loser")
-        return 0
-
-    def _housekeep(self, delete_end):
-        """ do garbage collection, bucket replentishing and notification """
-        data = self._data
-        period = self._period
-        min_spares = self._min_spare_buckets
-        DEBUG and TLOG(
-            '_housekeep: current slice %s' % self._getCurrentTimeslice()
-            )
-        notify = {}
-        while 1:
-            try:
-                k, v = self.notify_queue.get_nowait()
-                # duplicates will be ignored
-                notify[k] = v
-            except Queue.Empty:
-                break
-
-        to_notify = notify.values()
-        # if we have transient objects to notify about destruction, notify
-        # them (only once, that's why we use a queue) ("notification")
-        if to_notify:
-            DEBUG and TLOG('_housekeep: notifying: %s' % notify.keys())
-            self.notifyDestruct(to_notify)
-
-        # extend _data with extra buckets if necessary ("bucket replentishing")
-        try:
-            replentish_start = self.replentish_queue.get_nowait()
-            DEBUG and TLOG('_housekeep: replentishing')
-            new_bucket_keys=self._getTimeslices(replentish_start, min_spares)
-            DEBUG and TLOG('_housekeep: new_bucket_keys = %s '%new_bucket_keys)
-            for i in new_bucket_keys:
-                if data.has_key(i):
-                    continue
-                data[i] = OOBTree()
-        except Queue.Empty:
-            DEBUG and TLOG('replentish queue empty')
-
-        # gc the stale buckets at the "beginning" of _data ("garbage collect")
-        # iterate over the keys in data that have no minimum value and
-        # a maximum value of delete_end (note: ordered set)
-        # XXX- fixme.  range search doesn't always work (btrees bug)
-        for k in list(data.keys(None, delete_end)):
-            if k > delete_end:
-                DEBUG and TLOG(
-                    '_housekeep: broken range search (key %s > max %s)'
-                    % (k, delete_end)
-                    )
-                continue
-            bucket = data.get(k, _marker)
-            if bucket is _marker:
-                DEBUG and TLOG(
-                    'bucket OOBTree lied about keys: %s doesnt exist' % k
-                    )
-                continue
-            # delete the bucket from _data
-            del data[k]
-            DEBUG and TLOG('_housekeep: deleted data[%s]' % k)
-
-    def _deindex(self, keys):
-        """ Iterate over 'keys' and remove any that match from our index """
-        self.lock.acquire()
-        try:
-            index = self._getIndex()
-            for k in keys:
-                if index.has_key(k):
-                    DEBUG and TLOG('_deindex: deleting %s' % k)
-                    self.__len__.change(-1)
-                    del index[k]
-        finally:
-            self.lock.release()
-
-    def __setitem__(self, k, v):
-        self.lock.acquire()
-        try:
-            notfound = []
-            current = self._getCurrentBucket()
-            index = self._getIndex()
-            b = index.get(k, notfound)
-            if b is notfound:
-                # if this is a new item, we do OOM protection before actually
-                # adding it to ourselves.
-                li = self._limit
-                if li and len(self) >= li:
-                    LOG('Transience', WARNING,
-                        ('Transient object container %s max subobjects '
-                         'reached' % self.id)
-                        )
-                    raise MaxTransientObjectsExceeded, (
-                     "%s exceeds maximum number of subobjects %s" %
-                     (len(self), li))
-                # do length accounting
-                try: self.__len__.change(1)
-                except AttributeError: pass
-            elif b != current:
-                # this is an old key that isn't in the current bucket.
-                if self._data[b].has_key(k):
-                    del self._data[b][k] # delete it from the old bucket
-
-            # change the value
-            DEBUG and TLOG('setitem: setting current[%s]=%s' % (k,v))
-            self._data[current][k] = v
-            # change the TO's last accessed time
-            if hasattr(v, 'setLastAccessed'):
-                v.setLastAccessed()
-            # set the index up with the current bucket for this key
-            index[k] = current
-
-        finally:
-            self.lock.release()
-
-    def __getitem__(self, k):
-        self.lock.acquire()
-        try:
-            # we dont want to call getCurrentBucket here because we need to
-            # be able to raise a KeyError.  The housekeeping steps
-            # performed in the getCurrentBucket method would be ignored
-            # if we raised a KeyError.
-            index = self._getIndex()
-            # the next line will raise the proper error if the item has expired
-            b = index[k]
-            v = self._data[b][k]
-            if hasattr(v, '__of__'):
-                return v.__of__(self)
-            else:
-                return v
-        finally:
-            self.lock.release()
-
-    def __delitem__(self, k):
-        self.lock.acquire()
-        try:
-            self._getCurrentBucket()
-            index = self._getIndex()
-            b = index[k]
-            v = self._data[b][k]
-            del self._data[b][k]
-            self.__len__.change(-1)
-            if hasattr(v, '__of__'):
-                v = v.__of__(self)
-            del index[k]
-        finally:
-            self.lock.release()
-        self.notifyDestruct(v)
-
-
-    security.declareProtected(ACCESS_TRANSIENTS_PERM, 'get')
-    def get(self, k, default=_marker):
-        self.lock.acquire()
-        try:
-            DEBUG and TLOG('get: called with k=%s' % k)
-            notfound = []
-            current = self._getCurrentBucket()
-            DEBUG and TLOG('get: current is %s' % current)
-            if default is _marker: default=None
-            index = self._getIndex()
-            b = index.get(k, notfound)
-            if b is notfound:
-                # it's not here, this is a genuine miss
-                DEBUG and TLOG('bucket was notfound for %s' %k)
-                return default
-            else:
-                v = self._data[b].get(k, notfound)
-                if v is notfound:
-                    DEBUG and TLOG(
-                        'get: %s was not found in index bucket (%s)' % (k, b))
-                    return default
-                elif b != current:
-                    DEBUG and TLOG('get: b was not current, it was %s' %b)
-                    # we accessed the object, so it becomes current
-                    # by moving it to the current bucket
-                    del self._data[b][k] # delete the item from the old bucket.
-                    self._data[current][k] = v # add the value to the current
-                    self._setLastAccessed(v)
-                    index[k] = current # change the index to the current buck.
-
-                if hasattr(v, '__of__'):
-                    v = v.__of__(self)
-                return v
-        finally:
-            self.lock.release()
-
-    def _setLastAccessed(self, transientObject):
-        self.lock.acquire()
-        try:
-            sla = getattr(transientObject, 'setLastAccessed', None)
-            if sla is not None: sla()
-        finally:
-            self.lock.release()
-
-    security.declareProtected(ACCESS_TRANSIENTS_PERM, 'has_key')
-    def has_key(self, k):
-        notfound = []
-        v = self.get(k, notfound)
-        if v is notfound: return 0
-        return 1
-
-    def values(self):
-        # sloppy and loving it!
-        # we used to use something like:
-        # [ self[x] for x in self.keys() ]
-        # but it was causing KeyErrors in getitem's "v = self._data[b][k]"
-        # due to some synchronization problem that I don't understand.
-        # since this is a utility method, I don't care too much. -cm
-        l = []
-        notfound = []
-        for k, t in self._index.items():
-            bucket = self._data.get(t, notfound)
-            if bucket is notfound:
-                continue
-            value = bucket.get(k, notfound)
-            if value is notfound:
-                continue
-            if hasattr(value, '__of__'):
-                value = value.__of__(self)
-            l.append(value)
-        return l
-
-    def items(self):
-        # sloppy and loving it!
-        # we used to use something like:
-        # [ (x, self[x]) for x in self.keys() ]
-        # but it was causing KeyErrors in getitem's "v = self._data[b][k]"
-        # due to some synchronization problem that I don't understand.
-        # since this is a utility method, I don't care too much. -cm
-        l = []
-        notfound = []
-        for k, t in self._index.items():
-            bucket = self._data.get(t, notfound)
-            if bucket is notfound:
-                continue
-            value = bucket.get(k, notfound)
-            if value is notfound:
-                continue
-            if hasattr(value, '__of__'):
-                value = value.__of__(self)
-            l.append((k, value))
-        return l
-
-    def true_items(self):
-        l = []
-        for bucket in self._data.values():
-            items = list(bucket.items())
-            l.extend(items)
-        return l
-
-    def keys(self):
-        self._getCurrentBucket()
-        index = self._getIndex()
-        return list(index.keys())
-
-    # proxy security declaration
-    security.declareProtected(ACCESS_TRANSIENTS_PERM, 'getLen')
+    
+def getCurrentTimeslice():
+    """
+    Return an integer representing the 'current' timeslice.
+    The current timeslice is guaranteed to be the same integer
+    within a 'slice' of time based on a divisor of 'period'.
+    'period' is the number of seconds in a slice.
+    """
+    now = time.time()
+    low = int(math.floor(now)) - PERIOD + 1
+    high = int(math.ceil(now)) + 1
+    for x in range(low, high):
+        if x % PERIOD == 0:
+            return x
+
+def getTimeslices(begin, n):
+    """ Get a list of future timeslice integers of 'n' size in descending
+    order """
+    l = []
+    for x in range(n):
+        l.insert(0, begin + (x * PERIOD))
+    return l
+
+def _assert(case):
+    if not case:
+        raise AssertionError
 
 class Increaser(Persistent):
     """
@@ -920,37 +623,9 @@
         return self.value
 
     def _p_resolveConflict(self, old, state1, state2):
-        DEBUG and TLOG('Resolving conflict in Increaser')
         if old <= state1 <= state2: return state2
         if old <= state2 <= state1: return state1
         return old
-
-    def _p_independent(self):
-        return 1
-
-class Ring(Persistent):
-    """ ring of buckets.  This class is only kept for backwards-compatibility
-    purposes (Zope 2.5X). """
-    def __init__(self, l, index):
-        if not len(l):
-            raise ValueError, "ring must have at least one element"
-        DEBUG and TLOG('initial _ring buckets: %s' % map(oid, l))
-        self._data = l
-        self._index = index
-
-    def __repr__(self):
-        return repr(self._data)
-
-    def __len__(self):
-        return len(self._data)
-
-    def __getitem__(self, i):
-        return self._data[i]
-
-    def turn(self):
-        last = self._data.pop(-1)
-        self._data.insert(0, last)
-        self._p_changed = 1
 
     def _p_independent(self):
         return 1




More information about the Zope-Checkins mailing list