[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - Minimal.py:1.14

Barry Warsaw barry@wooz.org
Mon, 11 Nov 2002 15:59:45 -0500


Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv21428

Modified Files:
	Minimal.py 
Log Message:
Added pack and autopack support.  Packing is only necessary to garbage
collect cycles.

Also, some code updating and re-org to factor code into the base
class.

_dostore(): Support conflict resolution.


=== ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py 1.13 => 1.14 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py:1.13	Tue Nov  5 18:07:32 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py	Mon Nov 11 15:59:44 2002
@@ -17,6 +17,9 @@
 
 __version__ = '$Revision$'[-2:][0]
 
+import time
+import threading
+
 # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
 # http://pybsddb.sourceforge.net.  It is compatible with release 3.4 of
 # PyBSDDB3.
@@ -29,15 +32,29 @@
 from ZODB import POSException
 from ZODB.utils import U64, p64
 from ZODB.referencesf import referencesf
+from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
+import zLOG
 
 ABORT = 'A'
 COMMIT = 'C'
 PRESENT = 'X'
 ZERO = '\0'*8
 
+# Number of seconds for the autopack thread to sleep before checking to see if
+# it's time for another autopack run.  Lower numbers mean more processing,
+# higher numbers mean less responsiveness to shutdown requests.  10 seconds
+# seems like a good compromise.
+AUTOPACK_CHECK_SLEEP = 10
+
+try:
+    True, False
+except NameError:
+    True = 1
+    False = 0
+
 
 
-class Minimal(BerkeleyBase):
+class Minimal(BerkeleyBase, ConflictResolvingStorage):
     def _setupDBs(self):
         # Data Type Assumptions:
         #
@@ -77,53 +94,74 @@
         #     no pending entry.  It is a database invariant that if the
         #     pending table is empty, the oids table must also be empty.
         #
+        # packmark -- [oid]
+        #     Every object reachable from the root during a classic pack
+        #     operation will have its oid present in this table.
+        #
+        # oidqueue -- [oid]
+        #     This table is a Queue, not a BTree.  It is used during the mark
+        #     phase of pack() and contains a list of oids for work to be done.
+        #     It is also used during pack to list objects for which no more
+        #     references exist, such that the objects can be completely packed
+        #     away.
+        #
         self._serials = self._setupDB('serials', db.DB_DUP)
         self._pickles = self._setupDB('pickles')
         self._refcounts = self._setupDB('refcounts')
         self._oids = self._setupDB('oids')
         self._pending = self._setupDB('pending')
+        # Tables to support packing.
+        self._packmark = self._setupDB('packmark')
+        self._oidqueue = db.DB(self._env)
+        self._oidqueue.set_re_len(8)
+        # BAW: do we need to set the queue extent size?
+        self._oidqueue.open(self._prefix + 'oidqueue',
+                            db.DB_QUEUE, db.DB_CREATE)
         # Do recovery and consistency checks
         pendings = self._pending.keys()
         assert len(pendings) <= 1
         if len(pendings) == 0:
             assert len(self._oids) == 0
-            return
-        # Do recovery
-        tid = pendings[0]
-        flag = self._pending.get(tid)
-        assert flag in (ABORT, COMMIT)
-        self._lock_acquire()
-        try:
-            if flag == ABORT:
-                self._do(self._doabort, tid)
-            else:
-                self._do(self._docommit, tid)
-        finally:
-            self._lock_release()
+        else:
+            # Do recovery
+            tid = pendings[0]
+            flag = self._pending.get(tid)
+            assert flag in (ABORT, COMMIT)
+            self._lock_acquire()
+            try:
+                if flag == ABORT:
+                    self._withtxn(self._doabort, tid)
+                else:
+                    self._withtxn(self._docommit, tid)
+            finally:
+                self._lock_release()
+        # Set up the autopacking thread
+        if self._config.frequency > 0:
+            config = self._config
+            self._autopacker = _Autopack(self, config.frequency)
+            self._autopacker.start()
 
     def close(self):
+        # We must stop the autopacker first before closing any tables.  BAW:
+        # should we use a timeout on the join() call?  I'm not sure.  On the
+        # one hand we don't want to block forever, but on the other, killing
+        # the autopacker thread in the middle of real work could leave the
+        # databases in a corrupted state, requiring recovery.  With a
+        # AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
+        if self._autopacker:
+            zLOG.LOG('Minimal storage', zLOG.INFO, 'stopping autopack thread')
+            self._autopacker.stop()
+            self._autopacker.join()
         self._serials.close()
         self._pickles.close()
         self._refcounts.close()
         self._oids.close()
         self._pending.close()
+        self._packmark.close()
+        self._oidqueue.close()
         BerkeleyBase.close(self)
 
-    def _do(self, meth, tid):
-        txn = self._env.txn_begin()
-        try:
-            meth(tid, txn)
-            self._oids.truncate(txn)
-            self._pending.truncate(txn)
-        except:
-            txn.abort()
-            self._docheckpoint()
-            raise
-        else:
-            txn.commit()
-            self._docheckpoint()
-
-    def _doabort(self, tid, txn):
+    def _doabort(self, txn, tid):
         co = cs = None
         try:
             co = self._oids.cursor(txn=txn)
@@ -145,8 +183,14 @@
             # if co.close() were to fail.  In practice this shouldn't happen.
             if co: co.close()
             if cs: cs.close()
+        # We're done with these tables
+        self._oids.truncate(txn)
+        self._pending.truncate()
+
+    def _abort(self):
+        self._withtxn(self._doabort, self._serial)
 
-    def _docommit(self, tid, txn):
+    def _docommit(self, txn, tid):
         deltas = {}
         co = cs = None
         try:
@@ -183,6 +227,9 @@
             # if co.close() were to fail.  In practice this shouldn't happen.
             if co: co.close()
             if cs: cs.close()
+        # We're done with this table
+        self._oids.truncate(txn)
+        self._pending.truncate()
         # Now, to finish up, we need apply the refcount deltas to the
         # refcounts table, and do recursive collection of all refcount == 0
         # objects.
@@ -192,9 +239,9 @@
     def _update_refcounts(self, deltas, txn):
         newdeltas = {}
         for oid, delta in deltas.items():
-            rc = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta
-            assert rc >= 0
-            if rc == 0:
+            refcount = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta
+            assert refcount >= 0
+            if refcount == 0:
                 # The reference count for this object has just gone to zero,
                 # so we can safely remove all traces of it from the serials,
                 # pickles and refcounts table.  Note that before we remove its
@@ -209,7 +256,7 @@
                 self._refcounts.delete(oid, txn=txn)
                 self._pickles.delete(oid+current, txn=txn)
             else:
-                self._refcounts.put(oid, p64(rc), txn=txn)
+                self._refcounts.put(oid, p64(refcount), txn=txn)
         # Return the list of objects referenced by pickles just deleted in
         # this round, for decref'ing on the next go 'round.
         return newdeltas
@@ -220,6 +267,29 @@
         # will be aborted.
         self._pending[self._serial] = ABORT
 
+    def _dostore(self, txn, oid, serial, data):
+        conflictresolved = False
+        oserial = self._getCurrentSerial(oid)
+        if oserial is not None and serial <> oserial:
+            # The object exists in the database, but the serial number
+            # given in the call is not the same as the last stored serial
+            # number.  Raise a ConflictError.
+            data = self.tryToResolveConflict(oid, oserial, serial, data)
+            if data:
+                conflictresolved = True
+            else:
+                raise POSException.ConflictError(serials=(oserial, serial))
+        # Optimistically write to the serials and pickles table.  Be sure
+        # to also update the oids table for this object too.
+        newserial = self._serial
+        self._serials.put(oid, newserial, txn=txn)
+        self._pickles.put(oid+newserial, data, txn=txn)
+        self._oids.put(oid, PRESENT, txn=txn)
+        # Return the new serial number for the object
+        if conflictresolved:
+            return ResolvedSerial
+        return newserial
+        
     def store(self, oid, serial, data, version, transaction):
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
@@ -229,43 +299,16 @@
         # All updates must be done with the application lock acquired
         self._lock_acquire()
         try:
-            oserial = self._getCurrentSerial(oid)
-            if oserial is not None and serial <> oserial:
-                # The object exists in the database, but the serial number
-                # given in the call is not the same as the last stored serial
-                # number.  Raise a ConflictError.
-                #
-                # BAW: do application level conflict resolution
-                raise POSException.ConflictError(serials=(oserial, serial))
-            # Optimistically write to the serials and pickles table.  Be sure
-            # to also update the oids table for this object too.
-            newserial = self._serial
-            txn = self._env.txn_begin()
-            try:
-                self._serials.put(oid, newserial, txn=txn)
-                self._pickles.put(oid+newserial, data, txn=txn)
-                self._oids.put(oid, PRESENT, txn=txn)
-            except:
-                txn.abort()
-                self._docheckpoint()
-                raise
-            else:
-                txn.commit()
-                self._docheckpoint()
+            return self._withtxn(self._dostore, oid, serial, data)
         finally:
             self._lock_release()
-        # Return the new serial number for the object
-        return newserial
 
     def _finish(self, tid, u, d, e):
         # Twiddle the pending flag to COMMIT now since after the vote call, we
         # promise that the changes will be committed, no matter what.  The
         # recovery process will check this.
         self._pending[self._serial] = COMMIT
-        self._do(self._docommit, self._serial)
-
-    def _abort(self):
-        self._do(self._doabort, self._serial)
+        self._withtxn(self._docommit, self._serial)
 
     #
     # Accessor interface
@@ -326,3 +369,207 @@
         # So BaseStorage.getSerial() just works.  Note that this storage
         # doesn't support versions.
         return ''
+
+    #
+    # Packing.  In Minimal storage, packing is only required to get rid of
+    # object cycles, since there are no old object revisions.
+    #
+
+    def pack(self, t, zreferencesf):
+        # For all intents and purposes, referencesf here is always going to be
+        # the same as ZODB.referencesf.referencesf.  It's too much of a PITA
+        # to pass that around to the helper methods, so just assert they're
+        # the same.
+        assert zreferencesf == referencesf
+        zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack started')
+        # A simple wrapper around the bulk of packing, but which acquires a
+        # lock that prevents multiple packs from running at the same time.
+        self._packlock.acquire()
+        try:
+            # We don't wrap this in _withtxn() because we're going to do the
+            # operation across several Berkeley transactions, which allows
+            # other work to happen (stores and reads) while packing is being
+            # done.
+            #
+            # Also, we don't care about the pack time, since we don't need to
+            # collect object revisions
+            self._dopack()
+        finally:
+            self._packlock.release()
+        zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack finished')
+
+    def _dopack(self):
+        # Do a mark and sweep for garbage collection.  Calculate the set of
+        # objects reachable from the root.  Anything else is a candidate for
+        # having all their revisions packed away.  The set of reachable
+        # objects lives in the _packmark table.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._mark)
+        finally:
+            self._lock_release()
+        # Now perform a sweep, using oidqueue to hold all object ids for
+        # objects which are not root reachable as of the pack time.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._sweep)
+        finally:
+            self._lock_release()
+        # Once again, collect any objects with refcount zero due to the mark
+        # and sweep garbage collection pass.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._collect_objs)
+        finally:
+            self._lock_release()
+
+    def _mark(self, txn):
+        # Find the oids for all the objects reachable from the root.  To
+        # reduce the amount of in-core memory we need do do a pack operation,
+        # we'll save the mark data in the packmark table.  The oidqueue is a
+        # BerkeleyDB Queue that holds the list of object ids to look at next,
+        # and by using this we don't need to keep an in-memory dictionary.
+        assert len(self._packmark) == 0
+        assert len(self._oidqueue) == 0
+        # Quick exit for empty storages
+        if not self._serials:
+            return
+        # The oid of the object we're looking at, starting at the root
+        oid = ZERO
+        # Start at the root, find all the objects the current revision of the
+        # root references, and then for each of those, find all the objects it
+        # references, and so on until we've traversed the entire object graph.
+        while oid:
+            if self._packmark.has_key(oid):
+                # We've already seen this object
+                continue
+            self._packmark.put(oid, PRESENT, txn=txn)
+            # Get the pickle data for this object
+            tid = self._getCurrentSerial(oid)
+            # Say there's no root object (as is the case in some of the unit
+            # tests), and we're looking up oid ZERO.  Then serial will be None.
+            if tid is not None:
+                data = self._pickles[oid+tid]
+                # Now get the oids of all the objects referenced by this pickle
+                refdoids = []
+                referencesf(data, refdoids)
+                # And append them to the queue for later
+                for oid in refdoids:
+                    self._oidqueue.append(oid, txn)
+                # Pop the next oid off the queue and do it all again
+            rec = self._oidqueue.consume()
+            oid = rec and rec[1]
+        assert len(self._oidqueue) == 0
+
+    def _sweep(self, txn):
+        c = self._serials.cursor(txn=txn)
+        try:
+            rec = c.first()
+            while rec:
+                oid = rec[0]
+                rec = c.next()
+                # If packmark (which knows about all the root reachable
+                # objects) doesn't have a record for this guy, then we can zap
+                # it.  Do so by appending to oidqueue.
+                if not self._packmark.has_key(oid):
+                    self._oidqueue.append(oid, txn)
+        finally:
+            c.close()
+        # We're done with the mark table
+        self._packmark.truncate(txn=txn)
+
+    def _collect_objs(self, txn):
+        orec = self._oidqueue.consume()
+        while orec:
+            oid = orec[1]
+            # Delete the object from the serials table
+            c = self._serials.cursor(txn)
+            try:
+                rec = c.set(oid)
+                while rec and rec[0] == oid:
+                    c.delete()
+                    rec = c.next_dup()
+                # We don't need the refcounts any more, but note that if the
+                # object was never referenced from another object, there may
+                # not be a refcounts entry.
+                try:
+                    self._refcounts.delete(oid, txn=txn)
+                except db.DBNotFoundError:
+                    pass
+            finally:
+                c.close()
+            # Now collect the pickle data and do reference counting
+            c = self._pickles.cursor(txn)
+            try:
+                rec = c.set_range(oid)
+                while rec and rec[0][:8] == oid:
+                    data = rec[1]
+                    c.delete()
+                    rec = c.next()
+                    deltas = {}
+                    self._update(deltas, data, -1)
+                    for oid, delta in deltas.items():
+                        refcount = U64(self._refcounts.get(oid, ZERO)) + delta
+                        assert refcount >= 0
+                        if refcount == 0:
+                            self._oidqueue.append(oid, txn)
+                        else:
+                            self._refcounts.put(oid, p64(refcount), txn=txn)
+            finally:
+                c.close()
+            # We really do want this down here, since _decrefPickle() could
+            # add more items to the queue.
+            orec = self._oidqueue.consume()
+        assert len(self._oidqueue) == 0
+
+    #
+    # Stuff we don't support
+    #
+
+    def supportsTransactionalUndo(self):
+        return False
+
+    def supportsUndo(self):
+        return False
+
+    def supportsVersions(self):
+        return False
+
+    # Don't implement these
+    #
+    # versionEmpty(self, version)
+    # versions(self, max=None)
+    # loadSerial(self, oid, serial)
+    # getSerial(self, oid)
+    # transactionalUndo(self, tid, transaction)
+    # undoLog(self, first=0, last=-20, filter=None)
+    # history(self, oid, version=None, size=1, filter=None)
+    # iterator(self, start=None, stop=None)
+
+
+
+class _Autopack(threading.Thread):
+    def __init__(self, storage, frequency):
+        threading.Thread.__init__(self)
+        self._storage = storage
+        self._frequency = frequency
+        # Bookkeeping
+        self._stop = False
+        self._nextpack = 0
+
+    def run(self):
+        zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread started')
+        while not self._stop:
+            now = time.time()
+            if now > self._nextpack:
+                # Run the autopack phase
+                self._storage.pack('ignored', referencesf)
+                self._nextpack = now + self._frequency
+            # Now we sleep for a little while before we check again.  Sleep
+            # for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
+            # to be as responsive as ossible to .stop() calls.
+            time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
+        zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread finished')
+
+    def stop(self):
+        self._stop = True