[Checkins] SVN: zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py - Ignore duplicate data records (caused by an old blob bug).

Jim Fulton jim at zope.com
Thu Dec 17 14:06:31 EST 2009


Log message for revision 106705:
  - Ignore duplicate data records (caused by an old blob bug).
  
  - Handle deleted records in recover.
  
  2 changes to speed writing:
  
  - Use an in-memory transaction log when practical. (The log is used to
    save up data until tpc_vote to minimize the amount of time we hold
    the bdb locks.)
  
  - Combine the transactions and transaction_oids databases to
    reduce the number of database records and tables to reduce the
    number of database pages that have to be updated.
  

Changed:
  U   zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py

-=-
Modified: zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py
===================================================================
--- zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py	2009-12-17 16:15:52 UTC (rev 106704)
+++ zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py	2009-12-17 19:06:31 UTC (rev 106705)
@@ -15,9 +15,10 @@
 from bsddb3 import db
 from ZODB.utils import p64, u64, z64
 import cPickle
+import cStringIO
 import logging
-import marshal
 import os
+import struct
 import tempfile
 import threading
 import time
@@ -33,9 +34,6 @@
 def n64(tid):
     return p64(868082074056920076L-u64(tid))
 
-# XXX Still need checkpoint strategy.
-# Maybe initial config file when creating env.
-
 def retry_on_deadlock(f):
 
     def func(*args, **kw):
@@ -132,16 +130,7 @@
 
         self._len_lock = threading.Lock()
 
-        # transaction_oids: {tid->[oids]}
-        self.transaction_oids = db.DB(self.env)
-        self.transaction_oids.set_flags(db.DB_DUPSORT)
-        self.transaction_oids.open('transaction_oids', dbtype=db.DB_BTREE,
-                                   flags=(db.DB_CREATE | db.DB_THREAD |
-                                          db.DB_AUTO_COMMIT |
-                                          db.DB_MULTIVERSION),
-                                   )
-
-        # transactions: {tid ->status+transaction_pickle}
+        # transactions: {tid ->pickle((status,ext,oids))}
         self.transactions = db.DB(self.env)
         self.transactions.open('transactions', dbtype=db.DB_BTREE,
                                flags=(db.DB_CREATE | db.DB_THREAD |
@@ -198,7 +187,6 @@
         self.finish_packing()
         self.finish_checkpointing()
         self.data.close()
-        self.transaction_oids.close()
         self.transactions.close()
         self.env.close()
         if not self._read_only:
@@ -216,7 +204,7 @@
 
     def _history_entry(self, record, txn):
         tid = n64(record[:8])
-        transaction = cPickle.loads(self.transactions.get(tid, txn=txn)[1:])
+        transaction = cPickle.loads(self.transactions.get(tid, txn=txn))[1]
         transaction.update(
             size = len(record)-8,
             tid = tid,
@@ -259,10 +247,10 @@
                     while 1:
                         if not kv:
                             return
-                        tid, ext = kv
+                        tid, info = kv
                         if tid > stop:
                             return
-                        yield Records(self, tid, ext[0], ext[1:])
+                        yield Records(self, tid, *cPickle.loads(info))
                         kv = transactions.get(tid, flags=db.DB_NEXT)
                         n += 1
                         if n >= 1000:
@@ -386,32 +374,19 @@
                 return None
 
             with self.cursor(self.transactions, txn) as transactions:
-                kv = transactions.get(
-                    tid, flags=db.DB_SET_RANGE, doff=0, dlen=0)
+                kv = transactions.get(tid, flags=db.DB_SET_RANGE)
                 if kv is None:
                     return None
                 tid = kv[0]
                 if tid > pack_tid:
                     return None
 
-                # Set the status flag to indicate that the transaction
-                # was packed.
-                transactions.put(tid, 'p', db.DB_CURRENT, doff=0, dlen=1)
+                ntid = n64(tid)
 
-            ntid = n64(tid)
+                ext, oids = cPickle.loads(kv[1])[1:]
+                new_oids = []
+                for oid in oids:
 
-            with self.cursor(self.transaction_oids, txn) as transaction_oids:
-                # Find the smallest tid >= the one we picked
-                ttid, oid = transaction_oids.get(tid, flags=db.DB_SET_RANGE)
-                assert ttid == tid
-
-                # Iterate over the oids for this tid and pack each one.
-                # Note that we treat the tid we're looking at as the
-                # pack time. That is, as we look at each transaction,
-                # we pack to that time.  This way, we can pack
-                # *very* incrementally.
-                while 1:
-
                     # Find the first record for the oid whos tid is <=
                     # the pack time. (we use negative tids, so >=)
                     # This is the current record as of the pack time
@@ -420,7 +395,8 @@
                         if not kr:
                             kr = data.get(oid, flags=db.DB_SET)
                             if kr[1][:8] < ntid:
-                                # the one record found is after the pack time
+                                # the one record found is after
+                                # the pack time
                                 continue
 
                         doid, record = kr
@@ -436,6 +412,7 @@
                             removed_oids += 1
                         else:
                             deleted_oid = False
+                            new_oids.append(oid)
 
                         # OK, we have the current record as of the tid,
                         # we can remove later ones
@@ -463,12 +440,13 @@
                             # maybe transactions
                             self._pack_remove_oid_tid(dtid, oid, txn)
 
-                    # continue iterating over the oids for this tid
-                    kv = transaction_oids.get(tid, flags=db.DB_NEXT_DUP)
-                    if kv is None:
-                        break
-                    assert kv[0] == tid
-                    oid = kv[1]
+                if new_oids:
+                    # Update the status flag and oids.
+                    transactions.put(
+                        tid, cPickle.dumps(('p', ext, new_oids)), db.DB_CURRENT)
+                else:
+                    # transaction is empty. Delete it
+                    transactions.delete()
 
             self.misc.put('pack', tid, txn=txn)
 
@@ -481,21 +459,10 @@
         return tid
 
     def _pack_remove_oid_tid(self, tid, oid, txn):
-        with self.cursor(self.transaction_oids, txn) as transaction_oids:
-            kr = transaction_oids.get(tid, oid, flags=db.DB_GET_BOTH_RANGE)
-            if not kr:
-                kr = transaction_oids.get(tid, flags=db.DB_SET)
-            ttid, toid = kr
-            if toid != oid or ttid != tid:
-                raise AssertionError("Bad oid+tid lookup",
-                                     oid, tid, toid, ttid)
-            transaction_oids.delete()
-            # OK, we deleted the record. Maybe it was the last one. Try to get
-            # the first, and, if we can't, then delete the transaction record.
-            kv = transaction_oids.get(tid, flags=db.DB_SET)
-            if kv is None:
-                # OK, no more oids for this tid, remive it from transactions
-                self.transactions.delete(tid, txn)
+        ext, oids = cPickle.loads(
+            self.transactions.get(tid, txn=txn, flags=db.DB_RMW))[1:]
+        oids.remove(oid)
+        self.transactions.put(tid, cPickle.dumps(('p', ext, oids)), txn)
 
     def _remove_blob_files_tagged_for_removal_during_pack(self, removed):
         for oid in removed:
@@ -562,7 +529,7 @@
                     data = rdata
                     result = ZODB.ConflictResolution.ResolvedSerial
 
-        marshal.dump((oid, n64(self._tid)+data), self._log_file)
+        self._log(oid, n64(self._tid)+data)
         return result
 
     def restore(self, oid, serial, data, version, prev_txn, transaction):
@@ -570,7 +537,7 @@
         if transaction is not self._transaction:
             raise ZODB.POSException.StorageTransactionError(self, transaction)
 
-        marshal.dump((oid, n64(serial)+data), self._log_file)
+        self._log(oid, n64(serial)+(data or ''))
 
     def deleteObject(self, oid, oldserial, transaction):
         if transaction is not self._transaction:
@@ -580,7 +547,7 @@
             raise ZODB.POSException.ConflictError(
                 oid=oid, serials=(n64(committed_tid), oldserial))
 
-        marshal.dump((oid, n64(self._tid)), self._log_file)
+        self._log(oid, n64(self._tid))
 
     def tpc_begin(self, transaction, tid=None, status=' '):
         if self._read_only:
@@ -595,7 +562,6 @@
         ext = transaction._extension.copy()
         ext['user_name'] = transaction.user
         ext['description'] = transaction.description
-        ext = status+cPickle.dumps(ext, 1)
 
         if tid is None:
             now = time.time()
@@ -607,16 +573,13 @@
             self._ts = ZODB.TimeStamp.TimeStamp(tid)
             self._tid = tid
 
-        fd, self._log_path = tempfile.mkstemp('bsddb')
-        self._log_file = open(self._log_path, 'r+b')
-        os.close(fd)
-        marshal.dump((tid, ext), self._log_file)
+        self._log = ObjectLog()
+        self._log(tid, status, ext)
         self._new_obs = 0
 
     def _tpc_cleanup(self):
         self._transaction = self._txn = None
-        self._log_file.close()
-        os.remove(self._log_path)
+        self._log.close()
         self._commit_lock.release()
 
     def tpc_abort(self, transaction):
@@ -641,13 +604,21 @@
 
     _transaction_id_suffix = 'x' * (db.DB_GID_SIZE - 8)
     def tpc_vote(self, transaction):
+        log = iter(self._log)
+        tid, status, ext = log.next()
+        oids = []
         self._txn = txn = self.env.txn_begin()
-        self._log_file.seek(0)
-        tid, ext = marshal.load(self._log_file)
-        self.transactions.put(tid, ext, txn=txn)
-        for oid, record in marshal_iterate(self._log_file):
-            self.data.put(oid, record, txn=txn)
-            self.transaction_oids.put(tid, oid, txn=txn)
+        for oid, record in log:
+            try:
+                self.data.put(oid, record, txn=txn)
+            except db.DBKeyExistError:
+                # If the entire records are dups, we
+                # don't want to write them again. That
+                # would be silly.
+                pass
+            else:
+                oids.append(oid)
+        self.transactions.put(tid, cPickle.dumps((status, ext, oids)), txn=txn)
         txn.prepare(self._tid+self._transaction_id_suffix)
 
     ##############################################################
@@ -676,13 +647,6 @@
 
 Storage = BSDDBStorage # easier to type alias :)
 
-def marshal_iterate(f):
-    while 1:
-        try:
-            yield marshal.load(f)
-        except EOFError:
-            break
-
 class TransactionContext(object):
 
     def __init__(self, txn):
@@ -730,14 +694,14 @@
 
 class Records(object):
 
-    def __init__(self, storage, tid, status, ext):
+    def __init__(self, storage, tid, status, ext, oids):
         self.storage = storage
         self.tid = tid
-        ext = cPickle.loads(ext)
         self.user = ext.pop('user_name', '')
         self.description = ext.pop('description', '')
         self.status = status
         self.extension = ext
+        self.oids = oids
 
     @apply
     def _extension():
@@ -751,24 +715,17 @@
     def _iter(self):
         tid = self.tid
         ntid = n64(tid)
-        with self.storage.txn(db.DB_TXN_SNAPSHOT) as txn:
-            with self.storage.cursor(self.storage.transaction_oids, txn
-                                     ) as transaction_oids:
-                kv = transaction_oids.get(tid, flags=db.DB_SET)
-                while kv:
-                    ttid, oid = kv
-                    assert ttid == tid
-                    with self.storage.cursor(self.storage.data, txn
-                                             ) as data:
-                        kr = data.get(oid, ntid, flags=db.DB_GET_BOTH_RANGE)
-                        if kr is None:
-                            kr = data.get(oid, flags=db.DB_SET)
-                        doid, rec = kr
-                        assert doid == oid
-                        dntid = rec[:8]
-                        assert dntid == ntid
-                        yield Record(oid, tid, rec[8:])
-                    kv = transaction_oids.get(tid, flags=db.DB_NEXT_DUP)
+        for oid in self.oids:
+            with self.storage.txn(db.DB_TXN_SNAPSHOT) as txn:
+                with self.storage.cursor(self.storage.data, txn) as data:
+                    kr = data.get(oid, ntid, flags=db.DB_GET_BOTH_RANGE)
+                    if kr is None:
+                        kr = data.get(oid, flags=db.DB_SET)
+                    doid, rec = kr
+                    assert doid == oid
+                    dntid = rec[:8]
+                    assert dntid == ntid, (tid, ntid, dntid)
+                    yield Record(oid, tid, rec[8:])
 
 class Record:
     def __init__(self, oid, tid, data):
@@ -898,3 +855,40 @@
         thread.join(*args)
 
     return join
+
+
+class ObjectLog:
+    # Log of pickleable object data.
+    # In memory if possible
+
+    max_mem = 1<<20 # Most transactions are a few K or less
+    in_memory = True
+
+    def __init__(self):
+        self._file = cStringIO.StringIO()
+        self.close = self._file.close
+        self._size = 0
+
+    def __call__(self, *data):
+        data = cPickle.dumps(data, 1)
+        ldata = len(data)
+        size = self._size + len(data) + 4
+        if self.in_memory and size > self.max_mem:
+            newfile = tempfile.TemporaryFile()
+            self._file.seek(0)
+            newfile.write(self._file.read())
+            self._file = newfile
+            self.close = self._file.close
+            self.in_memory = False
+        self._file.write(struct.pack(">I", ldata))
+        self._file.write(data)
+
+    def __iter__(self):
+        file = self._file
+        file.seek(0)
+        while 1:
+            l = file.read(4)
+            if not l:
+                break
+            l = struct.unpack(">I", l)[0]
+            yield cPickle.loads(file.read(l))



More information about the checkins mailing list