[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - Full.py:1.44.2.8

Barry Warsaw barry@wooz.org
Wed, 30 Oct 2002 16:05:48 -0500


Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv29771

Modified Files:
      Tag: bdb-nolocks
	Full.py 
Log Message:
Checkpointing after implementing pack().  All the pack tests but two
pass, and I think I know why those fail, but I wanted to checkpoint
this code for now.

I had to add a zaprevs queue so as not to keep some bookkeeping
information in an in-core dictionary.  Sigh.


=== ZODB3/bsddb3Storage/bsddb3Storage/Full.py 1.44.2.7 => 1.44.2.8 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Full.py:1.44.2.7	Thu Oct 24 17:41:25 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Full.py	Wed Oct 30 16:05:47 2002
@@ -221,6 +221,10 @@
         #     This table is a Queue, not a BTree.  It is used during the mark
         #     phase of pack() and contains a list of oids for work to be done.
         #
+        # zaprevs -- [oid+tid]
+        #     This is another queue written during the sweep phase to collect
+        #     all the object revisions that can be packed away.
+        #
         self._serials = self._setupDB('serials', db.DB_DUP)
         self._pickles = self._setupDB('pickles')
         self._refcounts = self._setupDB('refcounts')
@@ -244,6 +248,10 @@
         # BAW: do we need to set the queue extent size?
         self._oidqueue.open(self._prefix + 'oidqueue',
                             db.DB_QUEUE, db.DB_CREATE)
+        self._zaprevs = db.DB(self._env)
+        self._zaprevs.set_re_len(16)
+        self._zaprevs.open(self._prefix + 'zaprevs',
+                           db.DB_QUEUE, db.DB_CREATE)
         # DEBUGGING
         #self._nextserial = 0L
         # END DEBUGGING
@@ -256,6 +264,7 @@
         # pack operation will reproduce it faithfully.
         self._oidqueue.truncate()
         self._packmark.truncate()
+        self._zaprevs.truncate()
         # The pendings table may have entries if we crashed before we could
         # abort or commit the outstanding ZODB transaction.
         pendings = self._pending.keys()
@@ -299,6 +308,7 @@
         self._pickleRefcounts.close()
         self._packmark.close()
         self._oidqueue.close()
+        self._zaprevs.close()
         BerkeleyBase.close(self)
 
     def _withtxn(self, meth, *args):
@@ -1265,21 +1275,80 @@
     #
     # There are two types of pack operations, the classic pack and autopack.
     # Classic pack is the full blown mark and sweep operation, removing all
-    # objects not reachable from the root.  This can take a long time,
-    # although the implementation attempts to mitigate both in-core memory
-    # usage and blocking other, non-packing operations.
+    # revisions of all objects not reachable from the root.  This can take a
+    # long time, although the implementation attempts to mitigate both in-core
+    # memory usage and blocking other, non-packing operations.
     #
     # Autopack is a more lightweight operation.  It only removes non-current
     # revisions in a window of transactions, and doesn't do a root
     # reachability test.
     #
 
+    def pack(self, t, zreferencesf):
+        # For all intents and purposes, referencesf here is always going to be
+        # the same as ZODB.referencesf.referencesf.  It's too much of a PITA
+        # to pass that around to the helper methods, so just assert they're
+        # the same.
+        assert zreferencesf == referencesf
+        zLOG.LOG('Full storage', zLOG.INFO, 'pack started')
+        # A simple wrapper around the bulk of packing, but which acquires a
+        # lock that prevents multiple packs from running at the same time.
+        self._packlock.acquire()
+        try:
+            # We don't wrap this in _withtxn() because we're going to do the
+            # operation across several Berkeley transactions.  It makes
+            # bookkeeping harder, but it also allows other work to happen
+            # (stores and reads) while packing is being done.
+            self._dopack(t)
+        finally:
+            self._packlock.release()
+        zLOG.LOG('Full storage', zLOG.INFO, 'pack done')
+
+    def _dopack(self, t):
+        # t is a TimeTime, or time float, convert this to a TimeStamp object,
+        # using an algorithm similar to what's used in FileStorage.  We know
+        # that our transaction ids, a.k.a. revision ids, are timestamps.  BAW:
+        # This doesn't play nicely if you enable the `debugging tids'
+        #
+        # BAW: should a pack time in the future be a ValueError?  We'd have to
+        # worry about clock skew, so for now, we just set the pack time to the
+        # minimum of t and now.
+        packtime = min(t, time.time())
+        t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
+        packtid = `t0`
+        # Calculate the set of objects reachable from the root.  Anything else
+        # is a candidate for having all their revisions packed away.  The set
+        # of reachable objects lives in the _packmark table.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._mark)
+        finally:
+            self._lock_release()
+        # Now cruise through all the transactions from the pack time forward,
+        # getting rid of any objects not reachable from the root, or any
+        # non-current revisions of reachable objects.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._sweep, packtid)
+        finally:
+            self._lock_release()
+        # Now we have the zaprevs table which contains a list of all object
+        # revisions that can get packed away.  So zap 'em.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._collect)
+        finally:
+            self._lock_release()
+
     def _mark(self, txn):
-        # Find the oids for all the objects reachable.  To reduce the amount
-        # of in-core memory we need do do a pack operation, we'll save the
-        # mark data in a BerkeleyDB table.
+        # Find the oids for all the objects reachable from the root.  To
+        # reduce the amount of in-core memory we need do do a pack operation,
+        # we'll save the mark data in the packmark table.  The oidqueue is a
+        # BerkeleyDB Queue that holds the list of object ids to look at next,
+        # and by using this we don't need to keep an in-memory dictionary.
         assert len(self._packmark) == 0
         assert len(self._oidqueue) == 0
+        assert len(self._zaprevs) == 0
         # Quick exit for empty storages
         if not self._serials:
             return
@@ -1295,51 +1364,140 @@
             self._packmark.put(oid, PRESENT, txn=txn)
             # Get the pickle data for this object's current version
             serial, tid = self._getSerialAndTidMissingOk(oid)
-            lrevid = self._metadata[oid+tid][16:24]
-            data = self._pickles[oid+lrevid]
-            # Now get the oids of all the objects referenced by this pickle
-            refdoids = []
-            referencesf(data, refdoids)
-            # And append them to the queue for later
-            for oid in refdoids:
-                self._oidqueue.append(oid, txn=txn)
-            # Pop the next oid off the queue and do it all again
+            # Say there's no root object (as is the case in some of the unit
+            # tests), and we're looking up oid ZERO.  Then serial will be None.
+            if serial is not None:
+                lrevid = self._metadata[oid+tid][16:24]
+                data = self._pickles[oid+lrevid]
+                # Now get the oids of all the objects referenced by this pickle
+                refdoids = []
+                referencesf(data, refdoids)
+                # And append them to the queue for later
+                for oid in refdoids:
+                    self._oidqueue.append(oid, txn)
+                # Pop the next oid off the queue and do it all again
             rec = self._oidqueue.consume()
-            oid = rec[1]
+            oid = rec and rec[1]
         assert len(self._oidqueue) == 0
 
-    def _sweep(self, txn):
+    def _sweep(self, txn, packtid):
         # We need to get the pending tid to skip over any serials entries
-        # matching this uncommitted transaction.  It's entirely possible that
-        # the pending transaction doesn't have all it's objects linked into
-        # the root yet.
-        pending = self._pending.keys()[0]
-        c = self._serials.cursor(txn=txn)
+        # matching an uncommitted transaction.  It's possible that the pending
+        # transaction doesn't have all its objects linked into the root yet.
+        cm = ct = None
         try:
-            rec = c.first()
+            cm = self._txnMetadata.cursor(txn=txn)
+            ct = self._txnoids.cursor(txn=txn)
+            # Find the latest transaction before the pack time.  To do this we
+            # find the smallest txnMetadata entry that is greater than
+            # packtid, and then move back one entry if they aren't equal.
+            try:
+                rec = cm.set_range(packtid)
+            except db.DBNotFoundError:
+                rec = cm.last()
+            if rec and rec[0] > packtid:
+                rec = cm.prev()
             while rec:
-                oid, tid = rec
-                if len(tid) <> 8:
-                    tid = tidrec[8:]
-                rec = c.next()
-                if tid == pending:
-                    continue
-                if self._packmark.has_key(oid):
-                    continue
-                # This object is not referenced from any object reachable from
-                # the root.  We can eliminate all traces of it in the
-                # database.
-                self._zapobject(oid, txn)
+                tid, metadata = rec
+                rec = cm.prev()
+                # XXX
+##                if metadata[0] == PROTECTED_TRANSACTION:
+##                    # We've scanned back to the last pack, so we don't need to
+##                    # go any further.
+##                    break
+                # Now look at the objects touched by this transaction.  If it
+                # isn't root reachable, zap the whole object.  If it is, but
+                # this revision isn't the current revision, then just this
+                # revision is packable.
+                orec = ct.set(tid)
+                while orec:
+                    otid, oid = orec
+                    orec = ct.next()
+                    if otid <> tid:
+                        break
+                    if self._packmark.has_key(oid):
+                        cserial, ctid = self._getSerialAndTid(oid)
+                        if tid <> cserial:
+                            # This is not the current revision
+                            self._zaprevs.append(oid+tid, txn)
+                    else:
+                        # The whole object is packable
+                        self._zaprevs.append(oid+DNE, txn)
         finally:
-            c.close()
-        # And now we're done with the packmark table
+            if cm: cm.close()
+            if ct: ct.close()
+        # We're done with the mark table
         self._packmark.truncate(txn=txn)
 
+    def _collect(self, txn):
+        rec = self._zaprevs.consume()
+        while rec:
+            revid = rec[1]
+            self._zaprevision(revid, txn)
+            rec = self._zaprevs.consume()
+        # And now we're done with this table too
+        self._zaprevs.truncate(txn=txn)
+
+    def _decref(self, deltas, txn):
+        for oid, delta in deltas.items():
+            refcount = U64(self._refcounts.get(oid, ZERO)) + delta
+            if refcount <= 0:
+                self._zaprevs.append(oid+DNE, txn)
+            else:
+                self._refcounts.put(oid, p64(refcount), txn=txn)
+
+    def _zaprevision(self, revid, txn):
+        # This method gets called when we're just zapping a single revision of
+        # an object, which we'd do if the object is still reachable from the
+        # root.  See _zapobject() for a more efficient way to delete all
+        # traces of the object, if it's no longer reachable from the root.
+        #
+        # Get the metadata record for this revision, so we can find the
+        # associated pickle data.  Then we can zap the metadata record
+        oid = revid[:8]
+        tid = revid[8:]
+        if tid == DNE:
+            # The entire object has been reference counted away
+            self._zapobject(oid, txn)
+            return
+        metadata = self._metadata[revid]
+        self._metadata.delete(revid, txn=txn)
+        vid, nvrevid, lrevid, prevrevid = unpack('>8s8s8s8s', metadata)
+        # Decrement the pickle reference count
+        key = oid+lrevid
+        refcount = U64(self._pickleRefcounts.get(key, ZERO)) - 1
+        if refcount <= 0:
+            # We can collect this pickle
+            self._pickleRefcounts.delete(key, txn=txn)
+            data = self._pickles[key]
+            self._pickles.delete(key, txn=txn)
+            deltas = {}
+            self._update(deltas, data, -1)
+            self._decref(deltas, txn)
+        else:
+            self._pickleRefcounts.put(p64(refcount), txn=txn)
+        # Delete the txnoid table entry for this revision
+        c = self._txnoids.cursor(txn=txn)
+        try:
+            c.set_both(tid, oid)
+            c.delete()
+        finally:
+            c.close()
+
     def _zapobject(self, oid, txn):
         # Delete all current serial number records
         c = self._serials.cursor(txn=txn)
         try:
-            rec = c.get(oid)
+            try:
+                rec = c.set(oid)
+            except db.DBNotFoundError:
+                # zaprevs could have multiple entries in it for the same
+                # oid+DNE combination.  Rather than make sure zaprevs has
+                # unique keys (which would require an in-core dict or yet
+                # another table), we'll just assume that if the oid isn't in
+                # the serials table, we've already collected it in a previous
+                # call of _zapobject().
+                return
             while rec:
                 if rec[0] <> oid:
                     break
@@ -1362,6 +1520,8 @@
                 self._update(deltas, data, -1)
         finally:
             c.close()
+        # Update the reference counts based on the deltas
+        self._decref(deltas, txn)
         # Delete the pickleRefcounts entry for all revisions of this object
         c = self._pickleRefcounts.cursor(txn=txn)
         try:
@@ -1373,8 +1533,13 @@
                 rec = c.next()
         finally:
             c.close()
-        # Delete the entry in the refcounts table for this object
-        self._refcounts.delete(oid, txn=txn)
+        # Delete the entry in the refcounts table for this object, if there is
+        # one.  Some of the unit tests don't link objects so there'll never be
+        # refcounts for them.
+        try:
+            self._refcounts.delete(oid, txn=txn)
+        except db.DBNotFoundError:
+            pass
         # Now cruise through all the metadata records for this object, keeping
         # track of the tids and vids so that we can clean up the txnoids and
         # currentVersions tables
@@ -1390,9 +1555,10 @@
                 c.delete()
                 rec = c.next()
                 tid = revid[8:]
-                vid = metadata[:8]
                 tids[tid] = 1
-                vids[vid] = 1
+                vid = metadata[:8]
+                if vid <> ZERO:
+                    vids[vid] = 1
         finally:
             c.close()
         # Zap currentVersions entries
@@ -1419,112 +1585,6 @@
                 c.delete()
         finally:
             c.close()
-        return deltas
-
-    def _dopack(self, txn, t):
-        # t is a TimeTime, or time float, convert this to a TimeStamp object,
-        # using an algorithm similar to what's used in FileStorage.  We know
-        # that our transaction ids, a.k.a. revision ids, are timestamps.  BAW:
-        # This doesn't play nicely if you enable the `debugging revids'
-        #
-        # BAW: should a pack time in the future be a ValueError?  We'd have to
-        # worry about clock skew, so for now, we just set the pack time to the
-        # minimum of t and now.
-        packtime = min(t, time.time())
-        t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
-        packtid = `t0`
-        # Calculate the set of objects reachable from the root.  Anything else
-        # is a candidate for having all their revisions packed away.
-        self._mark(txn)
-        # We now cruise through all the objects we know about, i.e. the keys
-        # of the serials table, looking at all the object revisions earlier
-        # than the pack time.  If the revision is not the current revision,
-        # then it's a packable revision.  We employ a BDB trick of set_range()
-        # to give us the smallest record greater than or equal to the one we
-        # ask for.  We move to the one just before that, and cruise backwards.
-        #
-        # This should also make us immune to evil future-pack time values,
-        # although it would still be better to raise a ValueError in those
-        # situations.  This is a dictionary keyed off the object id, with
-        # values which are a list of revisions (oid+tid) that can be packed.
-        packablerevs = {}
-        c = self._metadata.cursor()
-        try:
-            # BAW: can two threads be packing at the same time?  If so, we
-            # need to handle that.  If not, we should enforce that with a
-            # pack-lock.
-            for oid in self._serials.keys():
-                try:
-                    rec = c.set_range(oid+packtid)
-                    # The one just before this should be the largest record
-                    # less than or equal to the key, i.e. the object revision
-                    # just before the given pack time.
-                    rec = c.prev()
-                except db.DBNotFoundError:
-                    # Perhaps the last record in the database is the last one
-                    # containing this oid?
-                    rec = c.last()
-                # Now move backwards in time to look at all the revisions of
-                # this object.  All but the current one are packable, unless
-                # the object isn't reachable from the root, in which case, all
-                # its revisions are packable.
-                while rec:
-                    key, data = rec
-                    rec = c.prev()
-                    # Make sure we're still looking at revisions for this
-                    # object
-                    if oid <> key[:8]:
-                        break
-                    if not reachables.has_key(oid):
-                        packablerevs.setdefault(oid, []).append(key)
-                    # Otherwise, if this isn't the current revision for this
-                    # object, then it's packable.
-                    else:
-                        serial, tid = self._getSerialAndTid(oid)
-                        if tid <> key[8:]:
-                            packablerevs.setdefault(oid, []).append(key)
-        finally:
-            c.close()
-        # We now have all the packable revisions we're going to handle.  For
-        # each object with revisions that we're going to pack away, acquire
-        # the storage lock so we can do that without fear of trampling by
-        # other threads (i.e. interaction of transactionalUndo() and pack()).
-        #
-        # This set contains the oids of all objects that have been decref'd
-        # to zero by the pack operation.  To avoid recursion, we'll just note
-        # them now and handle them in a loop later.
-        #
-        # BAW: should packs be transaction protected?
-        decrefoids = {}
-        for oid in packablerevs.keys():
-            self._lock_acquire()
-            try:
-                for key in packablerevs[oid]:
-                    self._zaprevision(key, decrefoids, referencesf)
-            finally:
-                self._lock_release()
-        # While there are still objects to collect, continue to do so.
-        # Note that collecting an object may reveal more objects that are
-        # dec refcounted to zero.
-        while decrefoids:
-            oid, ignore = decrefoids.popitem()
-            self._zapobject(oid, decrefoids, referencesf)
-
-    def pack(self, t, zreferencesf):
-        # For all intents and purposes, referencesf here is always going to be
-        # the same as ZODB.referencesf.referencesf.  It's too much of a PITA
-        # to pass that around to the helper methods, so just assert they're
-        # the same.
-        assert zreferencesf == referencesf
-        zLOG.LOG('Full storage', zLOG.INFO, 'pack started')
-        # A simple wrapper around the bulk of packing, but which acquires a
-        # lock that prevents multiple packs from running at the same time.
-        self._packlock.acquire()
-        try:
-            self._withtxn(self._dopack, t)
-        finally:
-            self._packlock.release()
-        zLOG.LOG('Full storage', zLOG.INFO, 'pack done')
 
     #
     # GCable interface, for cyclic garbage collection (untested)