[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - Full.py:1.54

Barry Warsaw barry@wooz.org
Tue, 19 Nov 2002 15:13:54 -0500


Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv31204

Modified Files:
	Full.py 
Log Message:
_setupDBs(): The oidqueue table used to do double-duty, both during
the mark phase and during the sweep phase.  But this doesn't play
nicely with the escape hatch for pack, since if we raise a PackStop,
it's fine if we truncate the mark queue but not if we truncate the
sweep queue.  So the latter is now separated into the delqueue table.

Also, use the new extended _setupDB() arguments for the DB_QUEUE
tables.

close(): We can really simplify this and make it more robust for when
we add new tables, by relying on the fact that the base class
maintains its own list of opened tables, and the base class close()
method closes them in turn.

autopack(): Default the gc argument to False.

_collect_revs(), _collect_objs(), _mark(), _sweep(): Add an escape
hatch for the pack operation inside the inner loops of each of these
methods.  That way, we don't have to wait until the loops are finished
to exit the pack operation, if stop() has been requested by the main
thread.

AUTOPACK_CHECK_SLEEP removed.

_Autopack: Use the _WorkThread base class.


=== ZODB3/bsddb3Storage/bsddb3Storage/Full.py 1.53 => 1.54 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Full.py:1.53	Fri Nov 15 12:37:18 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Full.py	Tue Nov 19 15:13:54 2002
@@ -39,7 +39,7 @@
 # the Full and Minimal implementations.  It in turn inherits from
 # ZODB.BaseStorage.BaseStorage which itself provides some common storage
 # functionality.
-from BerkeleyBase import BerkeleyBase
+from BerkeleyBase import BerkeleyBase, PackStop, _WorkThread
 
 ABORT = 'A'
 COMMIT = 'C'
@@ -51,12 +51,6 @@
 # DEBUGGING
 #DNE = 'nonexist'
 
-# Number of seconds for the autopack thread to sleep before checking to see if
-# it's time for another autopack run.  Lower numbers mean more processing,
-# higher numbers mean less responsiveness to shutdown requests.  10 seconds
-# seems like a good compromise.
-AUTOPACK_CHECK_SLEEP = 10
-
 try:
     # Python 2.2
     from _helper import incr
@@ -208,6 +202,11 @@
         #     object is first written to a version, no entry is written here.
         #     We do write an entry when we commit or abort the version.
         #
+        # delqueue -- [oid]
+        #     This is also a Queue, not a BTree.  It is used during pack to
+        #     list objects for which no more references exist, such that the
+        #     objects can be completely packed away.
+        #
         # packmark -- [oid]
         #     Every object reachable from the root during a classic pack
         #     operation will have its oid present in this table.
@@ -215,9 +214,6 @@
         # oidqueue -- [oid]
         #     This table is a Queue, not a BTree.  It is used during the mark
         #     phase of pack() and contains a list of oids for work to be done.
-        #     It is also used during pack to list objects for which no more
-        #     references exist, such that the objects can be completely packed
-        #     away.
         #
         self._serials = self._setupDB('serials', db.DB_DUP)
         self._pickles = self._setupDB('pickles')
@@ -239,16 +235,13 @@
         self._objrevs = self._setupDB('objrevs', db.DB_DUP)
         self._packmark = self._setupDB('packmark')
         self._packtime = self._setupDB('packtime')
-        self._oidqueue = db.DB(self._env)
-        self._oidqueue.set_re_len(8)
-        # BAW: do we need to set the queue extent size?
-        self._oidqueue.open(self._prefix + 'oidqueue',
-                            db.DB_QUEUE, db.DB_CREATE)
+        self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 8)
+        self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
         # Do recovery and consistency checks
         self._withlock(self._dorecovery)
         # Set up the autopacking thread
-        if self._config.frequency > 0:
-            config = self._config
+        config = self._config
+        if config.frequency > 0:
             lastpacktime = U64(self._last_packtime())
             self._autopacker = _Autopack(
                 self, config.frequency,
@@ -304,35 +297,22 @@
             self.__ltid = ZERO
 
     def close(self):
-        # We must stop the autopacker first before closing any tables.  BAW:
-        # should we use a timeout on the join() call?  I'm not sure.  On the
-        # one hand we don't want to block forever, but on the other, killing
-        # the autopacker thread in the middle of real work could leave the
-        # databases in a corrupted state, requiring recovery.  With a
-        # AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
-        if self._autopacker:
-            zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
-            self._autopacker.stop()
-            self._autopacker.join()
-        self._serials.close()
-        self._pickles.close()
-        self._refcounts.close()
-        self._oids.close()
-        self._pvids.close()
-        self._prevrevids.close()
-        self._pending.close()
-        self._vids.close()
-        self._versions.close()
-        self._currentVersions.close()
-        self._metadata.close()
-        self._txnMetadata.close()
-        self._txnoids.close()
-        self._pickleRefcounts.close()
-        self._objrevs.close()
-        self._packtime.close()
-        self._packmark.close()
-        self._oidqueue.close()
-        BerkeleyBase.close(self)
+        # Set this flag before acquiring the lock so we don't block waiting
+        # for the autopack thread to give up the lock.
+        self._stop = True
+        self._lock_acquire()
+        try:
+            # We must stop the autopacker and checkpointing threads first
+            # before closing any tables.  I'm not sure about the join()
+            # timeout, but I'd be surprised if any particular iteration of a
+            # pack-related loops take longer than a few seconds.
+            if self._autopacker:
+                zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
+                self._autopacker.stop()
+                self._autopacker.join(30)
+            BerkeleyBase.close(self)
+        finally:
+            self._lock_release()
 
     def _doabort(self, txn, tid):
         # First clean up the oid indexed (or oid+tid indexed) tables.
@@ -1401,9 +1381,7 @@
             self._withtxn(self._collect_revs, packtid)
         finally:
             self._lock_release()
-        # Collect any objects with refcount zero.  We do this before the mark
-        # and sweep because we're sharing the oidqueue table for two different
-        # purposes.
+        # Collect any objects with refcount zero.
         self._lock_acquire()
         try:
             self._withtxn(self._collect_objs)
@@ -1436,7 +1414,12 @@
         finally:
             self._lock_release()
 
-    def autopack(self, t, gc):
+    def autopack(self, t, gc=False):
+        """Perform an autopack pass.
+
+        Autopacking is different than classic pack() in that it doesn't do
+        cyclic garbage detection unless the gc flag is True.
+        """
         zLOG.LOG('Full storage', zLOG.INFO,
                  'autopack started (packtime: %s, gc? %s)'
                  % (t, gc and 'yes' or 'no'))
@@ -1460,6 +1443,8 @@
             ct = self._txnoids.cursor(txn=txn)
             rec = co.first()
             while rec:
+                if self._stop:
+                    raise PackStop, 'stopped in _collect_revs()'
                 revid, oldserial = rec
                 newserial = revid[:8]
                 oid = revid[8:]
@@ -1526,11 +1511,13 @@
             else:
                 # This object is no longer referenced by any other object in
                 # the system.  We can collect all traces of it.
-                self._oidqueue.append(oid, txn)
+                self._delqueue.append(oid, txn)
 
     def _collect_objs(self, txn):
-        orec = self._oidqueue.consume()
+        orec = self._delqueue.consume()
         while orec:
+            if self._stop:
+                raise PackStop, 'stopped in _collect_objs()'
             oid = orec[1]
             # Delete the object from the serials table
             c = self._serials.cursor(txn)
@@ -1540,6 +1527,8 @@
                 except db.DBNotFoundError:
                     rec = None
                 while rec and rec[0] == oid:
+                    if self._stop:
+                        raise PackStop, 'stopped in _collect_objs() loop 1'
                     c.delete()
                     rec = c.next_dup()
                 # We don't need the refcounts any more, but note that if the
@@ -1559,6 +1548,8 @@
                 except db.DBNotFoundError:
                     rec = None
                 while rec and rec[0][:8] == oid:
+                    if self._stop:
+                        raise PackStop, 'stopped in _collect_objs() loop 2'
                     revid, metadata = rec
                     tid = revid[8:]
                     c.delete()
@@ -1587,8 +1578,8 @@
                 c.close()
             # We really do want this down here, since _decrefPickle() could
             # add more items to the queue.
-            orec = self._oidqueue.consume()
-        assert len(self._oidqueue) == 0
+            orec = self._delqueue.consume()
+        assert len(self._delqueue) == 0
 
     def _findrev(self, oid, packtid, txn):
         # BAW: Maybe this could probably be more efficient by not doing so
@@ -1630,6 +1621,8 @@
         # root references, and then for each of those, find all the objects it
         # references, and so on until we've traversed the entire object graph.
         while oid:
+            if self._stop:
+                raise PackStop, 'stopped in _mark()'
             if not self._packmark.has_key(oid):
                 # We haven't seen this object yet
                 self._packmark.put(oid, PRESENT, txn=txn)
@@ -1659,6 +1652,8 @@
         try:
             rec = c.first()
             while rec:
+                if self._stop:
+                    raise PackStop, 'stopped in _sweep()'
                 oid = rec[0]
                 rec = c.next()
                 serial, tid = self._getSerialAndTid(oid)
@@ -1671,7 +1666,7 @@
                 # reachable objects) doesn't have a record for this guy, then
                 # we can zap it.  Do so by appending to oidqueue.
                 if not self._packmark.has_key(oid):
-                    self._oidqueue.append(oid, txn)
+                    self._delqueue.append(oid, txn)
         finally:
             c.close()
         # We're done with the mark table
@@ -1878,39 +1873,23 @@
 
 
 
-class _Autopack(threading.Thread):
+class _Autopack(_WorkThread):
     def __init__(self, storage, frequency, packtime, classicpack,
                  lastpacktime):
-        threading.Thread.__init__(self)
-        self._storage = storage
-        self._frequency = frequency
+        _WorkThread.__init__(self, storage, frequency, 'autopacking')
         self._packtime = packtime
         self._classicpack = classicpack
         # Bookkeeping
         self._stop = False
-        self._nextpack = lastpacktime + self._frequency
         self._lastclassic = 0
 
-    def run(self):
-        zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread started')
-        while not self._stop:
-            now = time.time()
-            if now > self._nextpack:
-                # Should we do a classic pack this time?
-                if self._classicpack <= 0:
-                    classicp = False
-                else:
-                    v = (self._lastclassic + 1) % self._classicpack
-                    self._lastclassic = v
-                    classicp = not v
-                # Run the autopack phase
-                self._storage.autopack(now - self._packtime, classicp)
-                self._nextpack = now + self._frequency
-            # Now we sleep for a little while before we check again.  Sleep
-            # for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
-            # to be as responsive as ossible to .stop() calls.
-            time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
-        zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread finished')
-
-    def stop(self):
-        self._stop = True
+    def _dowork(self, now):
+        # Should we do a classic pack this time?
+        if self._classicpack <= 0:
+            classicp = False
+        else:
+            v = (self._lastclassic + 1) % self._classicpack
+            self._lastclassic = v
+            classicp = not v
+        # Run the autopack phase
+        self._storage.autopack(now - self._packtime, classicp)