[Zope-Checkins] CVS: Products/Transience - TransactionHelper.py:1.1.2.1 Transience.py:1.32.12.8.2.1

Chris McDonough chrism at plope.com
Sat Sep 11 21:00:02 EDT 2004


Update of /cvs-repository/Products/Transience
In directory cvs.zope.org:/tmp/cvs-serv28212

Modified Files:
      Tag: chrism-pre273-branch
	Transience.py 
Added Files:
      Tag: chrism-pre273-branch
	TransactionHelper.py 
Log Message:
Add pre-2.7.3 changes on branch for review.


=== Added File Products/Transience/TransactionHelper.py ===
import time

class PreventTransactionCommit:
    pass

class UncommittableJar:
    """ A jar that cannot be committed """
    def __init__(self, reason):
        self.reason = reason
        self.time = time.time()
        
    def sort_key(self):
        return self.time()

    def tpc_begin(self, transaction):
        pass

    def commit(self, obj, transaction):
        pass

    def tpc_vote(self, transaction):
        raise PreventTransactionCommit(self.reason)

class makeTransactionUncommittable:
    """
    - register an uncommittable object with the provided transaction
      which prevents the commit of that transaction
    """
    def __init__(self, transaction, reason):
        self._p_jar = UncommittableJar(reason)
        transaction.register(self)
        


=== Products/Transience/Transience.py 1.32.12.8 => 1.32.12.8.2.1 ===
--- Products/Transience/Transience.py:1.32.12.8	Sat Jul  3 20:21:01 2004
+++ Products/Transience/Transience.py	Sat Sep 11 20:59:32 2004
@@ -33,10 +33,11 @@
 from BTrees.Length import Length
 from BTrees.OOBTree import OOBTree
 from BTrees.IOBTree import IOBTree
-from ZODB.POSException import ConflictError
+from ZODB.POSException import ConflictError, ReadConflictError
 
 from Persistence import Persistent
 from OFS.SimpleItem import SimpleItem
+from ZPublisher.Publish import Retry
 from AccessControl import ClassSecurityInfo, getSecurityManager
 from AccessControl.SecurityManagement import newSecurityManager, \
      setSecurityManager
@@ -44,6 +45,7 @@
 from zLOG import LOG, WARNING, INFO
 
 from TransientObject import TransientObject
+from TransactionHelper import makeTransactionUncommittable
 from Fake import FakeIOBTree
 
 ADD_CONTAINER_PERM = 'Add Transient Object Container'
@@ -53,7 +55,7 @@
 ACCESS_TRANSIENTS_PERM = 'Access Transient Objects'
 MANAGE_CONTAINER_PERM = 'Manage Transient Object Container'
 
-SPARE_BUCKETS = 15 # minimum number of buckets to keep "spare"
+SPARE_BUCKETS = 1 # minimum number of buckets to keep "spare"
 BUCKET_CLASS = OOBTree # constructor for buckets
 DATA_CLASS = IOBTree # const for main data structure (timeslice->"bucket")
 STRICT = os.environ.get('Z_TOC_STRICT', '')
@@ -206,7 +208,7 @@
         # We make enough buckets initially to last us a while, and
         # we subsequently extend _data with fresh buckets and remove old
         # buckets as necessary during normal operations (see
-        # _gc() and _replentish()).
+        # _replentish() and _gc()).
         self._data = DATA_CLASS()
 
         # populate _data with some number of buckets, each of which
@@ -232,6 +234,10 @@
         # each expired item.
         self._last_finalized_timeslice = Increaser(-self._period)
 
+        # '_last_gc_timeslice' is a value that indicates in which
+        # timeslice the garbage collection process was last run.
+        self._last_gc_timeslice = Increaser(-self._period)
+        
         # our "_length" is the number of "active" data objects in _data.
         # it does not include items that are still kept in _data but need to
         # be garbage collected.
@@ -269,15 +275,10 @@
             bucket = self._data.get(0)
             return bucket.get(k, default)
 
-        # always call finalize
+        # do housekeeping
         self._finalize(current_ts)
-
-        # call gc and/or replentish on an only-as needed basis
-        if self._roll(current_ts, 'replentish'):
-            self._replentish(current_ts)
-
-        if self._roll(current_ts, 'gc'):
-            self._gc(current_ts)
+        self._replentish(current_ts)
+        self._gc(current_ts)
 
         # SUBTLETY ALERTY TO SELF: do not "improve" the code below
         # unnecessarily, as it will end only in tears.  The lack of aliases
@@ -348,13 +349,10 @@
         else:
             current_ts = 0
 
+        # do housekeeping
         self._finalize(current_ts)
-
-        if self._roll(current_ts, 'replentish'):
-            self._replentish(current_ts)
-
-        if self._roll(current_ts, 'gc'):
-            self._gc(current_ts)
+        self._replentish(current_ts)
+        self._gc(current_ts)
 
         STRICT and _assert(self._data.has_key(current_ts))
         current = self._getCurrentSlices(current_ts)
@@ -374,8 +372,8 @@
     def keys(self):
         return self._all().keys()
 
-    def rawkeys(self, current_ts):
-        # for debugging
+    def raw(self, current_ts):
+        # for debugging and unit testing
         current = self._getCurrentSlices(current_ts)
 
         current.reverse() # overwrite older with newer
@@ -496,40 +494,11 @@
         DEBUG and TLOG('has_key: returning false from for %s' % k)
         return False
 
-    def _roll(self, now, reason):
-        """
-        Roll the dice to see if we're the lucky thread that does
-        bucket replentishment or gc.  This method is guaranteed to return
-        true at some point as the difference between high and low naturally
-        diminishes to zero.
-
-        The reason we do the 'random' dance in the last part of this
-        is to minimize the chance that two threads will attempt to
-        do housekeeping at the same time (causing conflicts).
-        """
-        low = now/self._period
-        high = self._max_timeslice()/self._period
-        if high <= low:
-            # we really need to win this roll because we have no
-            # spare buckets (and no valid values to provide to randrange), so
-            # we rig the toss.
-            DEBUG and TLOG('_roll: %s rigged toss' % reason)
-            return True
-        else:
-            # we're not in an emergency bucket shortage, so we can
-            # take our chances during the roll.  It's unlikely that
-            # two threads will win the roll simultaneously, so we
-            # avoid a certain class of conflicts here.
-            if random.randrange(low, high) == low: # WINNAH!
-                DEBUG and TLOG("_roll: %s roll winner" % reason)
-                return True
-        DEBUG and TLOG("_roll: %s roll loser" % reason)
-        return False
-
     def _get_max_expired_ts(self, now):
         return now - (self._period * (self._timeout_slices + 1))
 
     def _finalize(self, now):
+        """ Call finalization handlers for the data in each stale bucket """
         if not self._timeout_slices:
             DEBUG and TLOG('_finalize: doing nothing (no timeout)')
             return # don't do any finalization if there is no timeout
@@ -555,18 +524,8 @@
                 now = getCurrentTimeslice(self._period) # for unit tests
 
             # we want to start finalizing from one timeslice after the
-            # timeslice which we last finalized.  Note that finalizing
-            # an already-finalized bucket somehow sends persistence
-            # into a spin with an exception later raised:
-            # "SystemError: error return without exception set",
-            # typically coming from
-            # Products.Sessions.SessionDataManager, line 182, in
-            # _getSessionDataObject (if getattr(ob, '__of__', None)
-            # and getattr(ob, 'aq_parent', None)). According to this
-            # email message from Jim, it may be because the ob is
-            # ghosted and doesn't have a _p_jar somehow:
-            #http://mail.zope.org/pipermail/zope3-dev/2003-February/005625.html
-
+            # timeslice which we last finalized.
+            
             start_finalize  = self._last_finalized_timeslice() + self._period
 
             # we want to finalize only up to the maximum expired timeslice
@@ -612,63 +571,129 @@
             self.finalize_lock.release()
 
     def _replentish(self, now):
-        # available_spares == the number of "spare" buckets that exist in
-        # "_data"
+        """ Add 'fresh' future or current buckets """
         if not self._timeout_slices:
             return # do nothing if no timeout
         
+        max_ts = self._max_timeslice()
+
+        low = now/self._period
+        high = max_ts/self._period
+
+        # the difference between high and low naturally diminishes to
+        # zero as now approaches self._max_timeslice() during normal
+        # operations.  If high <= low, it means we have no current bucket,
+        # so we *really* need to replentish (having a current bucket is
+        # an invariant for continued operation).
+
+        optional = not (high <= low)
+
         if not self.replentish_lock.acquire(0):
-            DEBUG and TLOG('_replentish: couldnt acquire lock')
-            return
 
-        try:
-            max_ts = self._max_timeslice()
-            available_spares = (max_ts-now) / self._period
-            DEBUG and TLOG('_replentish: now = %s' % now)
-            DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
-            DEBUG and TLOG('_replentish: available_spares = %s'
-                           % available_spares)
-
-            if available_spares >= SPARE_BUCKETS:
-                DEBUG and TLOG('_replentish: available_spares (%s) >= '
-                               'SPARE_BUCKETS (%s), doing '
-                               'nothing'% (available_spares,
-                                           SPARE_BUCKETS))
+            if not optional:
+                DEBUG and TLOG('_replentish: no current bucket but cant aq '
+                               'lock, making txn uncommittable + retry')
+                # Out of paranoia, make this transaction uncommittable;
+                # this may not be necessary.
+                makeTransactionUncommittable(
+                    get_transaction(), "Transience had no current bucket")
+                #time.sleep(random.uniform(0, 1)) # add entropy
+                raise Retry
+
+            else:
+                DEBUG and TLOG('_replentish: couldnt acquire lock, returning')
                 return
 
-            if max_ts < now:
-                replentish_start = now
-                replentish_end = now + (self._period * SPARE_BUCKETS)
+        try:
+            if optional:
+                DEBUG and TLOG('_replentish: attempting optional replentish')
+                # We're not in an emergency bucket shortage, so we don't 
+                # explicitly need to replentish with fresh new buckets.
+                # Minimize the chance that two threads will attempt to
+                # do housekeeping at the same time (which causes conflicts)
+                # by introducing a random element.
+                if random.randrange(low, high) == high: # do nothing
+                    DEBUG and TLOG('_replentish: lost random selection '
+                                   'in optional replentish, returning')
+                    return
+                else:
+                    DEBUG and TLOG('_replentish: won random selection '
+                                   'in optional replentish, continuing')
+                    self._do_replentish_work(now, max_ts)
 
             else:
-                replentish_start = max_ts + self._period
-                replentish_end = max_ts + (self._period * SPARE_BUCKETS)
+                # we're in an emergency bucket shortage, we need to replentish
+                DEBUG and TLOG('_replentish: forcing replentish '
+                               '(no current bucket)')
+                self._do_replentish_work(now, max_ts)
 
-            DEBUG and TLOG('_replentish: replentish_start = %s' %
-                           replentish_start)
-            DEBUG and TLOG('_replentish: replentish_end = %s'
-                           % replentish_end)
-            # n is the number of buckets to create
-            n = (replentish_end - replentish_start) / self._period
-            new_buckets = getTimeslices(replentish_start, n, self._period)
-            new_buckets.reverse()
-            STRICT and _assert(new_buckets)
-            DEBUG and TLOG('_replentish: adding %s new buckets' % n)
-            DEBUG and TLOG('_replentish: buckets to add = %s'
-                           % new_buckets)
-            for k in new_buckets:
-                STRICT and _assert(not self._data.has_key(k))
-                try:
-                    self._data[k] = BUCKET_CLASS()
-                except ConflictError:
-                    DEBUG and TLOG('_replentish: conflict when adding %s' % k)
-                    time.sleep(random.uniform(0, 1)) # add entropy
-                    raise
-            self._max_timeslice.set(max(new_buckets))
         finally:
             self.replentish_lock.release()
 
+    def _do_replentish_work(self, now, max_ts):
+        # this is only separated from _replentish for readability;
+        # it shouldn't be called without the replentish lock being held
+
+        # available_spares == the number of "spare" buckets that exist in
+        # "_data"
+        available_spares = (max_ts - now) / self._period
+        DEBUG and TLOG('_do_replentish_work: now = %s' % now)
+        DEBUG and TLOG('_do_replentish_work: max_ts = %s' % max_ts)
+        DEBUG and TLOG('_do_replentish_work: available_spares = %s'
+                       % available_spares)
+
+        if available_spares >= SPARE_BUCKETS:
+            DEBUG and TLOG('_do_replentish_work: available_spares (%s) >= '
+                           'SPARE_BUCKETS (%s), doing '
+                           'nothing'% (available_spares,
+                                       SPARE_BUCKETS))
+            return
+
+        if max_ts < now:
+            # the newest bucket in self._data is older than now!
+            replentish_start = now
+            replentish_end = now + (self._period * SPARE_BUCKETS)
+
+        else:
+            replentish_start = max_ts + self._period
+            replentish_end = max_ts + (self._period * (SPARE_BUCKETS +1))
+
+        DEBUG and TLOG('_do_replentish_work: replentish_start = %s' %
+                       replentish_start)
+        DEBUG and TLOG('_do_replentish_work: replentish_end = %s'
+                       % replentish_end)
+        # n is the number of buckets to create
+        n = (replentish_end - replentish_start) / self._period
+        new_buckets = getTimeslices(replentish_start, n, self._period)
+        new_buckets.reverse()
+        STRICT and _assert(new_buckets)
+        DEBUG and TLOG('_do_replentish_work: adding %s new buckets' % n)
+        DEBUG and TLOG('_do_replentish_work: buckets to add = %s'
+                       % new_buckets)
+        for k in new_buckets:
+            STRICT and _assert(not self._data.has_key(k))
+
+            # this is a conflict hotspot
+            try:
+                self._data[k] = BUCKET_CLASS()
+            except ConflictError:
+                DEBUG and TLOG('_do_replentish_work: conflict when adding %s' %
+                               k)
+                # Out of paranoia, make this transaction uncommittable;
+                # this is a fatal error and we need to retry the request
+                # to get back to a sane state but we haven't set max_timeslice
+                # yet, so an exception handler that catches this will screw us.
+                # For ZODB 3.2, this will prevent at least our data invariants
+                # for getting screwed up even if the exception is caught.
+                makeTransactionUncommittable(
+                    get_transaction(),
+                    "conflict error in Transience _do_replentish_work")
+                raise
+
+        self._max_timeslice.set(max(new_buckets))
+
     def _gc(self, now=None):
+        """ Remove stale buckets """
         if not self._timeout_slices:
             return # dont do gc if there is no timeout
 
@@ -676,15 +701,25 @@
             DEBUG and TLOG('_gc: couldnt acquire lock')
             return
 
-        try:
+        try: 
             if now is None:
                 now = getCurrentTimeslice(self._period) # for unit tests
 
-            # we want to garbage collect all buckets that have already been run
+            last_gc = self._last_gc_timeslice()
+            gc_every = self._period * SPARE_BUCKETS
+
+            if (now - last_gc) < gc_every:
+                DEBUG and TLOG('_gc: gc attempt not yet required '
+                               '( (%s - %s) < %s )' % (now, last_gc, gc_every))
+                return
+
+            DEBUG and TLOG('_gc: gc attempt proceeding')
+
+            # we garbage collect any buckets that have already been run
             # through finalization
+
             max_ts = self._last_finalized_timeslice()
 
-            DEBUG and TLOG('_gc: now is %s' % now)
             DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
 
             for key in list(self._data.keys(None, max_ts)):
@@ -692,6 +727,9 @@
                 STRICT and _assert(self._data.has_key(key))
                 DEBUG and TLOG('deleting %s from _data' % key)
                 del self._data[key]
+
+            self._last_gc_timeslice.set(now)
+
         finally:
             self.gc_lock.release()
 
@@ -834,8 +872,10 @@
     def nudge(self):
         """ Used by mgmt interface to maybe do housekeeping each time
         a screen is shown """
-        # run garbage collector so view is correct
-        self._gc()
+        now = getCurrentTimeslice(self._period)
+        self._finalize(now)
+        self._replentish(now)
+        self._gc(now)
 
     security.declareProtected(MANAGE_CONTAINER_PERM,
         'manage_changeTransientObjectContainer')
@@ -891,6 +931,10 @@
         if not state.has_key('_last_finalized_timeslice'):
             self._last_finalized_timeslice = Increaser(-self._period)
 
+        # TOCs prior to 2.7.3 didn't have a _last_gc_timeslice
+        if not state.has_key('_last_gc_timeslice'):
+            self._last_gc_timeslice = Increaser(-self._period)
+
         # we should probably delete older attributes from state such as
         # '_last_timeslice', '_deindex_next',and '__len__' here but we leave
         # them in order to allow people to switch between 2.6.0->2.7.0 and
@@ -947,7 +991,7 @@
     def _p_resolveConflict(self, old, state1, state2):
         return max(old, state1, state2)
 
-    def _p_independent(self):
-        return 1
+##     def _p_independent(self):
+##         return 1
 
 Globals.InitializeClass(TransientObjectContainer)



More information about the Zope-Checkins mailing list