[Zodb-checkins] CVS: StandaloneZODB/bsddb3Storage/bsddb3Storage - Full.py:1.41

Barry Warsaw barry@wooz.org
Fri, 23 Aug 2002 13:20:34 -0400


Update of /cvs-repository/StandaloneZODB/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv2245/bsddb3Storage

Modified Files:
	Full.py 
Log Message:
Merging in changes from the bsddb3Storage-picklelog-branch.  Briefly:

The picklelog branch attempts to improve performance and reduce the
possibility of lock file exhaustion by optimistically writing pickle
and metadata tables at the store() call instead of during the
_finish() call.  store() will be bounded in the number of objects it
touches, but _finish() is unbounded, so we try to reduce the number of
database pages the latter might actually touch.

Other performance improvements are implemented based on hotshot
profiling.  The use of an extension module for Python 2.2 also speeds
up a critical loop.

This also implements the periodic checkpointing to improve recovery
times when the database is not cleanly shutdown.


=== StandaloneZODB/bsddb3Storage/bsddb3Storage/Full.py 1.40 => 1.41 ===
--- StandaloneZODB/bsddb3Storage/bsddb3Storage/Full.py:1.40	Thu Mar 21 12:27:47 2002
+++ StandaloneZODB/bsddb3Storage/bsddb3Storage/Full.py	Fri Aug 23 13:20:34 2002
@@ -2,14 +2,14 @@
 #
 # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE
-# 
+#
 ##############################################################################
 
 """Berkeley storage with full undo and versioning support.
@@ -24,6 +24,12 @@
 import struct
 import time
 
+from cPickle import loads, Pickler
+Pickler = Pickler()
+Pickler.fast = 1 # Don't use a memo
+fast_pickle_dumps = Pickler.dump
+del Pickler
+
 # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
 # http://pybsddb.sourceforge.net
 from bsddb3 import db
@@ -55,6 +61,14 @@
 # DEBUGGING
 #DNE = 'nonexist'                                  # does not exist
 
+try:
+    # Python 2.2
+    from _helper import incr
+except ImportError:
+    # Python 2.1
+    def incr(refcount, delta):
+        return p64(U64(refcount) + delta)
+
 
 
 class Full(BerkeleyBase, ConflictResolvingStorage):
@@ -98,6 +112,12 @@
         #     Maps the concrete object referenced by oid+revid to that
         #     object's data pickle.
         #
+        # picklelog -- {oid+revid -> ''}
+        #     Keeps a log of pickles that haven't been committed yet.
+        #     This allows us to write pickles as we get them in the
+        #     in separate BDB transactions.  The value of the mapping is
+        #     ignored.
+        #
         # These are used only by the Full implementation.
         #
         # vids -- {version_string -> vid}
@@ -164,9 +184,10 @@
         #     Maps the concrete object referenced by oid+tid to the reference
         #     count of its pickle.
         #
-        # Tables common to the base framework 
+        # Tables common to the base framework
         self._serials = self._setupDB('serials')
         self._pickles = self._setupDB('pickles')
+        self._picklelog = self._setupDB('picklelog')
         # These are specific to the full implementation
         self._vids            = self._setupDB('vids')
         self._versions        = self._setupDB('versions')
@@ -176,6 +197,7 @@
         self._txnoids         = self._setupDB('txnoids', db.DB_DUP)
         self._refcounts       = self._setupDB('refcounts')
         self._pickleRefcounts = self._setupDB('pickleRefcounts')
+
         # Initialize our cache of the next available version id.
         record = self._versions.cursor().last()
         if record:
@@ -186,11 +208,17 @@
         else:
             self.__nextvid = 0L
         # DEBUGGING
+        # NOTE: some tests will fail if you enable debugging serial numbers
+        # because it breaks the default assumption that serial numbers are
+        # timestamps.  Things like packing and undoing will break.
         #self._nextserial = 0L
-        
+        #self.profiler = hotshot.Profile('profile.dat', lineevents=1)
+
     def close(self):
+        #self.profiler.close()
         self._serials.close()
         self._pickles.close()
+        self._picklelog.close()
         self._vids.close()
         self._versions.close()
         self._currentVersions.close()
@@ -214,7 +242,18 @@
             self._commitlog = FullLog(dir=self._env.db_home)
         self._commitlog.start()
 
+# To turn on hotshot profiling, uncomment the following function, and rename
+# _finish() to _real_finish().  Also, uncomment out the creation of the
+# profiler in _setupDBs() above, and the closing of the profiler in close()
+# below.  Then check out the profout.py file for dumping out the profiling
+# information.
+#
+##    def _finish(self, tid, u, d, e):
+##        self.profiler.runcall(self._real_finish, tid, u, d, e)
+
     def _finish(self, tid, u, d, e):
+##        pack = struct.pack
+##        unpack = struct.unpack
         # This is called from the storage interface's tpc_finish() method.
         # Its responsibilities are to finish the transaction with the
         # underlying database.
@@ -252,65 +291,60 @@
             self._txnMetadata.put(tid,
                                   UNDOABLE_TRANSACTION + lengths + u + d + e,
                                   txn=txn)
+            picklekeys = []
+            metadata = []
+            picklerefcounts = {}
+            serials = []
+            refcounts = {}
             while 1:
                 rec = self._commitlog.next()
                 if rec is None:
                     break
                 op, data = rec
-                if op == 'o':
+                if op in 'ox':
                     # This is a `versioned' object record.  Information about
                     # this object must be stored in the pickle table, the
                     # object metadata table, the currentVersions tables , and
                     # the transactions->oid table.
-                    oid, vid, nvrevid, lrevid, pickle, prevrevid = data
+                    oid, vid, nvrevid, lrevid, refdoids, prevrevid = data
                     key = oid + tid
-                    if pickle:
-                        # This was the result of a store() call which gives us
-                        # a brand new pickle, so we need to update the pickles
-                        # table.  The lrevid will be empty, and we make it the
-                        # tid of this transaction
+                    if refdoids is not None:
+                        # This was the result of a store() call which gave us
+                        # new pickle data.  Since the pickle is already
+                        # stored, we just need to twiddle with reference
+                        # counts.  We also need to clear the picklelog for
+                        # this object revision.
                         #
                         # Otherwise, this was the result of a commitVersion()
                         # or abortVersion() call, essentially moving the
                         # object to a new version.  We don't need to update
-                        # the pickle table because we aren't creating a new
+                        # any of the tables because we aren't creating a new
                         # pickle.
-                        self._pickles.put(key, pickle, txn=txn)
                         lrevid = tid
                         # Boost the refcount of all the objects referred to by
-                        # this pickle.  referencesf() scans a pickle and
-                        # returns the list of objects referenced by the
-                        # pickle.  BAW: the signature of referencesf() has
-                        # changed for Zope 2.4, to make it more convenient to
-                        # use.  Gotta stick with the backwards compatible
-                        # version for now.
+                        # this pickle.
                         #
                         # FIXME: need to watch for two object revisions in the
                         # same transaction and only bump the refcount once,
                         # since we only keep the last of any such revisions.
-                        refdoids = []
-                        referencesf(pickle, refdoids)
                         for roid in refdoids:
-                            refcount = self._refcounts.get(roid, ZERO, txn=txn)
-                            refcount = p64(U64(refcount) + 1)
-                            self._refcounts.put(roid, refcount, txn=txn)
+                            refcounts[roid] = refcounts.get(roid, 0) + 1
                     # Update the metadata table
-                    self._metadata.put(key, vid+nvrevid+lrevid+prevrevid,
-                                       txn=txn)
+                    if op == 'o':
+                        # `x' opcode does an immediate write to metadata
+                        metadata.append(
+                            (key, ''.join((vid,nvrevid,lrevid,prevrevid))))
                     # If we're in a real version, update this table too.  This
                     # ends up putting multiple copies of the vid/oid records
                     # in the table, but it's easier to weed those out later
                     # than to weed them out now.
                     if vid <> ZERO:
                         self._currentVersions.put(vid, oid, txn=txn)
-                    self._serials.put(oid, tid, txn=txn)
-                    self._txnoids.put(tid, oid, txn=txn)
+                    serials.append((oid, tid))
                     # Update the pickle's reference count.  Remember, the
                     # refcount is stored as a string, so we have to do the
                     # string->long->string dance.
-                    refcount = self._pickleRefcounts.get(key, ZERO, txn=txn)
-                    refcount = p64(U64(refcount) + 1)
-                    self._pickleRefcounts.put(key, refcount, txn=txn)
+                    picklerefcounts[key] = picklerefcounts.get(key, 0) + 1
                 elif op == 'v':
                     # This is a "create-a-version" record
                     version, vid = data
@@ -327,18 +361,56 @@
                             rec = c.next_dup()
                     finally:
                         c.close()
+            # It's actually faster to boogie through this list twice
+            #print >> sys.stderr, 'start:', self._lockstats()
+            for oid, tid in serials:
+                self._txnoids.put(tid, oid, txn=txn)
+            #print >> sys.stderr, 'post-txnoids:', self._lockstats()
+            for oid, tid in serials:
+                self._serials.put(oid, tid, txn=txn)
+            #print >> sys.stderr, 'post-serials:', self._lockstats()
+            for key, data in metadata:
+                self._metadata.put(key, data, txn=txn)
+            #print >> sys.stderr, 'post-metadata:', self._lockstats()
+            for roid, delta in refcounts.items():
+                refcount = self._refcounts.get(roid, ZERO, txn=txn)
+                self._refcounts.put(roid, incr(refcount, delta), txn=txn)
+            #print >> sys.stderr, 'post-refcounts:', self._lockstats()
+            for key, delta in picklerefcounts.items():
+                refcount = self._pickleRefcounts.get(key, ZERO, txn=txn)
+                self._pickleRefcounts.put(key, incr(refcount, delta), txn=txn)
+            # We're done with the picklelog
+            self._picklelog.truncate(txn)
+            #print >> sys.stderr, 'loop-finish:', self._lockstats()
+        # Handle lock exhaustion differently
+        except db.DBNoMemoryError, e:
+            txn.abort()
+            self._docheckpoint()
+            raise POSException.TransactionTooLargeError, e
         except:
             # If any errors whatsoever occurred, abort the transaction with
             # Berkeley, leave the commit log file in the PROMISED state (since
             # its changes were never committed), and re-raise the exception.
             txn.abort()
+            self._docheckpoint()
             raise
         else:
             # Everything is hunky-dory.  Commit the Berkeley transaction, and
             # reset the commit log for the next transaction.
             txn.commit()
+            self._docheckpoint()
             self._closelog()
 
+    def _abort(self):
+        # We need to clear the picklelog and all the stored pickles in the
+        # pickle log, since we're abort this transaction.
+        for key in self._picklelog.keys():
+            del self._pickles[key]
+            del self._metadata[key]
+        # Done with the picklelog
+        self._picklelog.truncate()
+        BerkeleyBase._abort(self)
+
     #
     # Do some things in a version
     #
@@ -551,7 +623,7 @@
 
     def loadSerial(self, oid, serial):
         return self._loadSerialEx(oid, serial)[0]
-                        
+
     def getSerial(self, oid):
         # Return the revision id for the current revision of this object,
         # irrespective of any versions.
@@ -579,6 +651,33 @@
             self._commitlog.write_new_version(version, vid)
         return vid
 
+    def _log_object(self, oid, vid, nvrevid, data, oserial):
+        # Save data for later commit.  We do this by writing the pickle
+        # directly to the pickle table and saving the pickle key in the pickle
+        # log.  We'll also save the metadata using the same technique.  We
+        # extract the references and save them in the transaction log.
+        #
+        # Get the oids to the objects this pickle references
+        refdoids = []
+        referencesf(data, refdoids)
+        # Record the update to this object in the commit log.
+        self._commitlog.write_object(oid, vid, nvrevid, refdoids, oserial)
+        # Save the pickle in the database:
+        txn = self._env.txn_begin()
+        try:
+            key = oid + self._serial
+            self._pickles.put(key, data, txn=txn)
+            self._metadata.put(
+                key,
+                ''.join((vid, nvrevid, self._serial, oserial)),
+                txn=txn)
+            self._picklelog.put(key, '', txn=txn)
+        except:
+            txn.abort()
+            raise
+        else:
+            txn.commit()
+
     def store(self, oid, serial, data, version, transaction):
         # Transaction equivalence guard
         if transaction is not self._transaction:
@@ -643,8 +742,8 @@
                         tuple(map(U64, (oid, ovid, vid))))
                 else:
                     nvrevid = onvrevid
-            # Record the update to this object in the commit log.
-            self._commitlog.write_object(oid, vid, nvrevid, data, oserial)
+            # Store the object
+            self._log_object(oid, vid, nvrevid, data, oserial)
         finally:
             self._lock_release()
         # Return our cached serial number for the object.  If conflict
@@ -776,8 +875,7 @@
                 # see duplicate oids in this iteration.
                 oids[oid] = 1
             for oid, vid, nvrevid, data, prevrevid in newstates:
-                self._commitlog.write_object(oid, vid, nvrevid, data,
-                                             prevrevid)
+                self._log_object(oid, vid, nvrevid, data, prevrevid)
                 oids[oid] = 1
             return oids.keys()
         finally:
@@ -1252,7 +1350,7 @@
 
     def gcRefcount(oid):
         """Return the reference count of the specified object.
-        
+
         Raises KeyError if there is no object with oid.  Both the oid argument
         and the returned reference count are integers.
         """
@@ -1392,7 +1490,7 @@
 class _RecordsIterator:
     """Provide transaction meta-data and forward iteration through the
     transactions in a storage.
-    
+
     Items *must* be accessed sequentially (e.g. with a for loop).
     """