[Checkins] SVN: zc.bsddbstorage/branches/dev/s Checkpoint

Jim Fulton jim at zope.com
Fri Nov 13 13:04:23 EST 2009


Log message for revision 105629:
  Checkpoint
  

Changed:
  U   zc.bsddbstorage/branches/dev/setup.py
  U   zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py

-=-
Modified: zc.bsddbstorage/branches/dev/setup.py
===================================================================
--- zc.bsddbstorage/branches/dev/setup.py	2009-11-13 16:54:33 UTC (rev 105628)
+++ zc.bsddbstorage/branches/dev/setup.py	2009-11-13 18:04:22 UTC (rev 105629)
@@ -13,7 +13,7 @@
 ##############################################################################
 name, version = 'zc.bsddbstorage', '0'
 
-install_requires = ['setuptools', 'bsddb3']
+install_requires = ['setuptools', 'bsddb3', 'ZODB3']
 extras_require = dict(test=['zope.testing'])
 
 entry_points = """

Modified: zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py
===================================================================
--- zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py	2009-11-13 16:54:33 UTC (rev 105628)
+++ zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py	2009-11-13 18:04:22 UTC (rev 105629)
@@ -13,18 +13,67 @@
 ##############################################################################
 
 from bsddb3 import db
+from ZODB.utils import p64, u64, z64
+import cPickle
+import marshal
 import os
+import tempfile
+import threading
+import time
+import zc.lockfile
+import ZODB
+import ZODB.blob
+import ZODB.ConflictResolution
+import ZODB.interfaces
 import ZODB.POSException
 import ZODB.TimeStamp
+import zope.interface
 
+def n64(tid):
+    return p64(868082074056920076L-u64(tid))
 
-class BSDDBStorage:
+class BSDDBStorage(
+    ZODB.blob.BlobStorageMixin,
+    ZODB.ConflictResolution.ConflictResolvingStorage,
+    ):
 
-    def __init__(self, envpath):
+    zope.interface.implements(
+        ZODB.interfaces.IStorage,
+        ZODB.interfaces.IStorageRestoreable,
+#         ZODB.interfaces.IStorageIteration,
+#         ZODB.interfaces.IStorageCurrentRecordIteration,
+        ZODB.interfaces.IExternalGC,
+        )
+
+    def __init__(self, envpath, blob_dir=None, pack=3*86400,
+                 create=False, read_only=False):
         self.__name__ = envpath
+        envpath = os.path.abspath(envpath)
+        if create:
+            if os.path.isdir(envpath):
+                ZODB.blob.remove_committed_dir(envpath)
+            if blob_dir and os.path.isdir(blob_dir):
+                ZODB.blob.remove_committed_dir(blob_dir)
+        self._pack = pack
+        self._read_only = read_only
         if not os.path.isdir(envpath):
             os.mkdir(envpath)
 
+        if not read_only:
+            # Create the lock file
+            self._lock_file = zc.lockfile.LockFile(
+                os.path.join(envpath, 'zodb.lock'))
+
+        if blob_dir:
+            blob_dir = os.path.abspath(blob_dir)
+            self._blob_init(blob_dir)
+            zope.interface.alsoProvides(self,
+                                        ZODB.interfaces.IBlobStorageRestoreable)
+        else:
+            self.blob_dir = None
+            self._blob_init_no_blobs()
+
+
         self.env = db.DBEnv()
         self.env.open(envpath,
                       db.DB_INIT_LOCK | db.DB_INIT_LOG | db.DB_INIT_MPOOL |
@@ -33,7 +82,7 @@
 
         # data: {oid -> [tid+data]}
         self.data = db.DB(self.env)
-        self.data.set_flags(db.DB_DUP)
+        self.data.set_flags(db.DB_DUPSORT)
         self.data.open('data', dbtype=db.DB_HASH,
                        flags=(db.DB_CREATE | db.DB_THREAD | db.DB_AUTO_COMMIT |
                               db.DB_MULTIVERSION),
@@ -42,7 +91,7 @@
 
         # transaction_oids: {tid->[oids]}
         self.transaction_oids = db.DB(self.env)
-        self.transaction_oids.set_flags(db.DB_DUP)
+        self.transaction_oids.set_flags(db.DB_DUPSORT)
         self.transaction_oids.open('transaction_oids', dbtype=db.DB_BTREE,
                                    flags=(db.DB_CREATE | db.DB_THREAD |
                                           db.DB_AUTO_COMMIT |
@@ -51,56 +100,87 @@
 
         # transactions: {tid ->transaction_pickle}
         self.transactions = db.DB(self.env)
-        self.transaction.open('transactions', dbtype=db.DB_BTREE,
-                              flags=(db.DB_CREATE | db.DB_THREAD |
-                                     db.DB_AUTO_COMMIT | db.DB_MULTIVERSION),
-                              )
+        self.transactions.open('transactions', dbtype=db.DB_BTREE,
+                               flags=(db.DB_CREATE | db.DB_THREAD |
+                                      db.DB_AUTO_COMMIT | db.DB_MULTIVERSION),
+                               )
 
+        # Misc info:
+        # pack-trans
+        self.misc = db.DB(self.env)
+        self.misc.open('misc', dbtype=db.DB_HASH,
+                       flags=(db.DB_CREATE | db.DB_THREAD | db.DB_AUTO_COMMIT |
+                              db.DB_MULTIVERSION),
+                       )
+
+        t = time.time()
+        t = self._ts = ZODB.TimeStamp.TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
+        self._tid = repr(t)
+        self._transaction = None
+
+        self._commit_lock = threading.Lock()
+
     def txn(self, flags=0):
         return TransactionContext(self.env.txn_begin(flags=flags))
 
-    def cursor(self, db, txn=None, flags=0):
-        return CursorContext(self.db.cursor(txn, flags))
+    def cursor(self, database, txn=None, flags=0):
+        return CursorContext(database.cursor(txn, flags))
 
     def close(self):
         self.data.close()
         self.transaction_oids.close()
         self.transactions.close()
+        self.env.close()
+        if not self._read_only:
+            self._lock_file.close()
 
     def getName(self):
         return self.__name__
 
     def getSize(self):
-        return os.stat(self.datapath)
+        return os.stat(self.datapath).st_size
 
     def _history_entry(self, record, txn):
-        tid = record[:8]
+        tid = n64(record[:8])
         transaction = cPickle.loads(self.transactions.get(tid, txn=txn))
         transaction.update(size=len(record-8))
 
     def history(self, oid, size=1):
         with self.txn(db.DB_TXN_SNAPSHOT) as txn:
             with self.cursor(self.data, txn) as cursor:
-                k, record = cursor.get(oid, db.DB_PREV)
-                if k != oid or len(record) == 8:
+                kv = cursor.get(oid, db.DB_SET)
+                if kv is None:
                     raise ZODB.POSException.POSKeyError(oid)
-
-                result = [_history_entry(record)]
-                while len(result) < size):
-                    kv = cursor.get(oid, db.PREV_DUP)
+                k, record = kv
+                if len(record) == 8:
+                    raise ZODB.POSException.POSKeyError(oid)
+                result = [self._history_entry(record, txn)]
+                while len(result) < size:
+                    kv = cursor.get(oid, db.DB_NEXT_DUP)
                     if kv is None:
                         break
-                    result.append(_history_entry(kb[1])
+                    result.append(self._history_entry(kv[1], txn))
 
-                cursor.close()
-                return result
+            return result
 
     def isReadOnly(self):
-        return False
+        return self._read_only
 
+    def iterator(self, start=z64, stop=None):
+        with self.txn(db.DB_READ_COMMITTED) as txn:
+            with self.cursor(self.transactions, txn) as transactions:
+                kv = transactions.get(start, flags=db.DB_SET_RANGE)
+                while kv:
+                    tid, ext = kv
+                    yield Records(self, txn, tid, ext)
+                kv = transactions.get(tid, flags=db.DB_NEXT)
+
+#     def record_iternext(next=None):
+#         pass # XXX
+
     def lastTransaction(self):
         with self.txn() as txn:
-            with self.cursor(self.data, txn) as cursor:
+            with self.cursor(self.transactions, txn) as cursor:
                 return cursor.get(db.DB_LAST)[0]
 
     def __len__(self):
@@ -109,245 +189,304 @@
     def load(self, oid, version=''):
         with self.txn() as txn:
             with self.cursor(self.data, txn) as cursor:
-                k, record = cursor.get(oid, db.DB_PREV)
-                if k != oid or len(record) == 8:
-                    raise ZODB.POSException.POSKeyError(oid)
-                return result[8:], result[:8]
+                kv = cursor.get(oid, db.DB_SET)
+                if kv:
+                    record = kv[1]
+                    data = record[8:]
+                    if data:
+                        return data, n64(record[:8])
 
+                raise ZODB.POSException.POSKeyError(oid)
 
     def loadBefore(self, oid, tid):
         with self.txn(db.DB_TXN_SNAPSHOT) as txn:
             with self.cursor(self.data, txn) as cursor:
-                k, record = cursor.get(oid, db.DB_PREV)
-                if k != oid or len(record) == 8:
+                kr = cursor.get(oid, db.DB_SET)
+                if kr is None:
                     raise ZODB.POSException.POSKeyError(oid)
+                record = kr[1]
+                if kr[0] != oid or len(record) == 8:
+                    raise ZODB.POSException.POSKeyError(oid)
                 nexttid = None
-                rtid = record[:8]
+                rtid = n64(record[:8])
                 while rtid >= tid:
-                    krecord = cursor.get(oid, db.PREV_DUP)
+                    krecord = cursor.get(oid, flags=db.DB_NEXT_DUP)
                     if krecord is None:
                         return None
                     nexttid = rtid
                     record = krecord[1]
-                    rtid = record[:8]
+                    rtid = n64(record[:8])
 
                 return record[8:], rtid, nexttid
 
-    def loadSerial(oid, serial):
+    def loadSerial(self, oid, serial):
+        serial = n64(serial)
         with self.txn(db.DB_TXN_SNAPSHOT) as txn:
             with self.cursor(self.data, txn) as cursor:
-                k, record = cursor.get(oid, db.DB_PREV)
-                if k != oid or len(record) == 8:
-                    raise ZODB.POSException.POSKeyError(oid)
-                nexttid = None
-                rtid = record[:8]
-                while rtid >= tid:
-                    krecord = cursor.get(oid, db.PREV_DUP)
-                    if krecord is None:
-                        return None
-                    nexttid = rtid
-                    record = krecord[1]
-                    rtid = record[:8]
+                kr = cursor.get(oid, serial, flags=db.DB_GET_BOTH_RANGE)
+                if kr:
+                    k, record = kr
+                    if k == oid and record[:8] == serial:
+                        data = record[8:]
+                        if data:
+                            return data
 
-                return record[8:], rtid, nexttid
+                raise ZODB.POSException.POSKeyError(oid, serial)
 
+    def new_oid(self):
+        with self.txn() as txn:
+            oid = p64(u64(self.misc.get('oid', z64, txn))+1)
+            self.misc.put('oid', oid, txn)
+            return oid
 
+    def pack(self, pack_time=None, referencesf=None):
+        if pack_time is None:
+            pack_time = time.time()-self._pack
+        pack_tid = timetime2tid(pack_time)
+        while self._pack1(pack_tid):
+            pass
+        self._remove_empty_notlast_blob_directories(self.blob_dir)
 
-        """Load the object record for the give transaction id
+    def _pack1(self, pack_tid):
+        # Pack one transaction. Get the next transaction we haven't yet
+        # packed and stop if it is > pack_tid.
+        # This is done as a transaction.
+        removed_blobs = []
+        with self.txn(db.DB_TXN_SNAPSHOT) as txn:
+            # Pick a tid just past the last one we packed:
+            tid = p64(u64(self.misc.get('pack', z64, txn=txn))+1)
+            if tid > pack_tid:
+                return None
+            with self.cursor(self.transaction_oids, txn) as transaction_oids:
+                # Find the smallest tid >= the one we picked
+                kv = transaction_oids.get(tid, flags=db.DB_SET_RANGE)
+                if kv is None:
+                    return None
 
-        If a matching data record can be found, it is returned,
-        otherwise, POSKeyError is raised.
-        """
+                tid, oid = kv
+                ntid = n64(tid)
 
-#     The following two methods are effectively part of the interface,
-#     as they are generally needed when one storage wraps
-#     another. This deserves some thought, at probably debate, before
-#     adding them.
-#
-#     def _lock_acquire():
-#         """Acquire the storage lock
-#         """
+                # Iterate ober the oids for this tid and pack each one.
+                # Note that we treat the tid we're looking at as the
+                # pack time. That is, as we look at each transaction,
+                # we pack to that time.  This way, we can pack
+                # *very* incrementally.
+                while 1:
 
-#     def _lock_release():
-#         """Release the storage lock
-#         """
+                    # Find the first record for the oid whos tid is <=
+                    # the pack time. (we use negative tids, so >=)
+                    # This is the current record as of the pack time
+                    with self.cursor(self.data, txn) as data:
+                        doid, record = data.get(oid, ntid,
+                                              flags=db.DB_GET_BOTH_RANGE)
+                        assert doid == oid
+                        ndtid = record[:8]
+                        assert ndtid >= ntid
+                        if len(record) == 8:
+                            # delete record, so we can delete the record,
+                            # which with the deletions below, will
+                            # delete the oid
+                            data.delete()
+                            deleted_oid = True
+                        else:
+                            deleted_oid = False
 
-    def new_oid():
-        """Allocate a new object id.
+                        # OK, we have the current record as of the tid,
+                        # we can remove later ones
 
-        The object id returned is reserved at least as long as the
-        storage is opened.
+                        while 1:
+                            kv = data.get(oid, ntid, flags=db.DB_NEXT_DUP)
+                            if kv is None:
+                                break
+                            doid, record = kv
+                            assert doid == oid
+                            data.delete()
+                            ndtid = record[:8]
+                            dtid = n64(ndtid)
+                            pickle = record[8:]
+                            if (self.blob_dir and
+                                ZODB.blob.is_blob_record(pickle)
+                                ):
+                                if deleted_oid:
+                                    if ((not removed_blobs) or
+                                        (removed_blobs[-1] != oid)):
+                                        removed_blobs.append(oid)
+                                else:
+                                    removed_blobs.append(oid+dtid)
+                            # clean up transaction_oids and
+                            # maybe transactions
+                            self._pack_remove_oid_tid(dtid, oid, txn)
 
-        The return value is a string.
-        """
+                    # continue iterating over the oids for this tid
+                    kv = self.transaction_oids.get(tid, flags=db.DB_NEXT_DUP)
+                    if kv is None:
+                        break
+                    assert kv[0] == tid
+                    oid = kv[1]
 
-    def pack(pack_time, referencesf):
-        """Pack the storage
+            self.misc.put('pack', tid, txn=txn)
 
-        It is up to the storage to interpret this call, however, the
-        general idea is that the storage free space by:
+        if removed_blobs:
+            self._remove_blob_files_tagged_for_removal_during_pack(
+                removed_blobs)
 
-        - discarding object revisions that were old and not current as of the
-          given pack time.
+        return tid
 
-        - garbage collecting objects that aren't reachable from the
-          root object via revisions remaining after discarding
-          revisions that were not current as of the pack time.
+    def _pack_remove_oid_tid(self, tid, oid, txn):
+        with self.cursor(self.transaction_oids, txn) as transaction_oids:
+            toid, ttid = transaction_oids.get(oid, tid,
+                                              flags=db.DB_GET_BOTH_RANGE)
+            if toid != oid or ttid != tid:
+                raise AssertionError("Bad oid+tid lookup",
+                                     oid, tid, toid, ttid)
+            transaction_oids.delete()
+            # OK, we deleted the record. Maybe it was the last one. Try to get
+            # the first, and, if we can't, then delete the transaction record.
+            kv = transaction_oids.get(tid, flags=db.DB_SET)
+            if kv is None:
+                # OK, no more oids for this tid, remive it from transactions
+                self.transactions.delete(tid, txn)
 
-        The pack time is given as a UTC time in seconds since the
-        epoch.
+    def _remove_blob_files_tagged_for_removal_during_pack(self, removed):
+        for oid in removed:
+            if len(oid) == 8:
+                # oid is garbage, re/move dir
+                path = self.fshelper.getPathForOID(oid)
+                if os.path.exists(path):
+                    ZODB.blob.remove_committed_dir(path)
+            else:
+                tid = oid[8:]
+                oid = oid[:8]
+                path = self.fshelper.getBlobFilename(oid, tid)
+                if os.path.exists(path):
+                    ZODB.blob.remove_committed(path)
 
-        The second argument is a function that should be used to
-        extract object references from database records.  This is
-        needed to determine which objects are referenced from object
-        revisions.
-        """
+    def _remove_empty_notlast_blob_directories(self, dir):
+        # clean up empty blob dirs. This relies on the
+        # fact that oids are allocates sequentially
+        paths = filter(os.path.isdir,
+                       [os.path.join(dir, name)
+                        for name in sorted(os.listdor(dir))
+                        if name.lower().startswith('0x')])
+        if not paths:
+            return
+        self._remove_empty_notlast_blob_directories(paths.pop())
+        for path in paths:
+            self._remove_empty_recursively_directory(path)
 
-    def registerDB(db):
-        """Register an IStorageDB.
+    def _remove_empty_recursively_directory(self, dir):
+        if not os.path.isdir(dir):
+            return False
+        for name in os.listdir(dir):
+            if not self._remove_empty_recursively_directory(
+                os.path.join(dir, name)):
+                return False
+        os.rmdir(dir)
+        return True
 
-        Note that, for historical reasons, an implementation may
-        require a second argument, however, if required, the None will
-        be passed as the second argument.
-        """
+    def registerDB(self, db):
+        pass
 
-    def sortKey():
-        """Sort key used to order distributed transactions
+    def sortKey(self):
+        return self.__name__
 
-        When a transaction involved multiple storages, 2-phase commit
-        operations are applied in sort-key order.  This must be unique
-        among storages used in a transaction. Obviously, the storage
-        can't assure this, but it should construct the sort key so it
-        has a reasonable chance of being unique.
+    def store(self, oid, oldserial, data, version, transaction):
+        assert not version
+        if transaction is not self._transaction:
+            raise ZODB.POSException.StorageTransactionError(self, transaction)
+        committed_tid = self.data.get(oid, dlen=8, doff=0)
+        if committed_tid is not None:
+            committed_tid = n64(committed_tid)
+            if committed_tid != oldserial:
+                rdata = self.tryToResolveConflict(oid, committed_tid,
+                                                  oldserial, data)
+                if rdata is None:
+                    raise ZODB.POSException.ConflictError(
+                        oid=oid, serials=(committed_tid, oldserial),
+                        data=data)
+                else:
+                    data = rdata
 
-        The result must be a string.
-        """
+        marshal.dump((oid, n64(self._tid)+data), self._log_file)
 
-    def store(oid, serial, data, version, transaction):
-        """Store data for the object id, oid.
+    def restore(self, oid, serial, data, version, prev_txn, transaction):
+        assert not version
+        if transaction is not self._transaction:
+            raise ZODB.POSException.StorageTransactionError(self, transaction)
 
-        Arguments:
+        marshal.dump((oid, n64(serial)+data), self._log_file)
 
-        oid
-            The object identifier.  This is either a string
-            consisting of 8 nulls or a string previously returned by
-            new_oid. 
+    def deleteObject(self, oid, oldserial, transaction):
+        if transaction is not self._transaction:
+            raise ZODB.POSException.StorageTransactionError(self, transaction)
+        committed_tid = self.data.get(oid, dlen=8, doff=0)
+        if committed_tid is not None and committed_tid != oldserial:
+            raise ZODB.POSException.ConflictError(
+                oid=oid, serials=(committed_tid, oldserial))
 
-        serial
-            The serial of the data that was read when the object was
-            loaded from the database.  If the object was created in
-            the current transaction this will be a string consisting
-            of 8 nulls.
+        marshal.dump((oid, n64(self._tid)), self._log_file)
 
-        data
-            The data record. This is opaque to the storage.
+    def tpc_abort(self, transaction):
+        self._txn.abort()
+        self._txn = self._transaction = None
+        self._blob_tpc_abort()
+        self._commit_lock.release()
 
-        version
-            This must be an empty string. It exists for backward compatibility.
+    def tpc_begin(self, transaction, tid=None, status=' '):
+        if self._read_only:
+            raise ZODB.POSException.ReadOnlyError()
 
-        transaction
-            A transaction object.  This should match the current
-            transaction for the storage, set by tpc_begin.
+        self._commit_lock.acquire()
+        if self._transaction is not None and transaction != self._transaction:
+            self._commit_lock.release()
+            raise ZODB.POSException.StorageTransactionError(self, transaction)
 
-        The new serial for the object is returned, but not necessarily
-        immediately.  It may be returned directly, or on a subsequent
-        store or tpc_vote call.
+        self._transaction = transaction
 
-        The return value may be:
+        ext = transaction._extension.copy()
+        ext['user'] = transaction.user
+        ext['description'] = transaction.description
+        ext = cPickle.dumps(ext, 1)
 
-        - None
+        if tid is None:
+            now = time.time()
+            t = ZODB.TimeStamp.TimeStamp(
+                *(time.gmtime(now)[:5] + (now % 60,)))
+            self._ts = t = t.laterThan(self._ts)
+            self._tid = tid = repr(t)
+        else:
+            self._ts = ZODB.TimeStamp.TimeStamp(tid)
+            self._tid = tid
 
-        - A new serial (string) for the object, or
+        fd, path = tempfile.mkstemp('bsddb')
+        self._log_file = open(path, 'r+b')
+        os.close(fd)
+        marshal.dump((tid, ext), self._log_file)
 
-        - An iterable of object-id and serial pairs giving new serials
-          for objects.
+    def tpc_finish(self, transaction, func = lambda tid: None):
+        self._txn.commit()
+        self._txn = self._transaction = None
+        self._blob_tpc_finish()
+        self._commit_lock.release()
 
-        A serial, returned as a string or in a sequence of oid/serial
-        pairs, may be the special value
-        ZODB.ConflictResolution.ResolvedSerial to indicate that a
-        conflict occured and that the object should be invalidated.
+    _transaction_id_suffix = 'x' * (db.DB_GID_SIZE - 8)
+    def tpc_vote(self, transaction):
+        self._txn = txn = self.env.txn_begin()
+        self._log_file.seek(0)
+        tid, ext = marshal.load(self._log_file)
+        self.transactions.put(tid, ext, txn=txn)
+        for oid, record in marhal_iterate(self._log_file):
+            self.data.put(oid, record, txn=txn)
+            self.transaction_oids.put(tid, oid, txn=txn)
+        txn.prepare(self._tid+self._transaction_id_suffix)
 
-        Several different exceptions may be raised when an error occurs.
+def marhal_iterate(f):
+    while 1:
+        try:
+            yield marshal.load(f)
+        except EOFError:
+            break
 
-        ConflictError
-          is raised when serial does not match the most recent serial
-          number for object oid and the conflict was not resolved by
-          the storage.
-
-        StorageTransactionError
-          is raised when transaction does not match the current
-          transaction.
-
-        StorageError or, more often, a subclass of it
-          is raised when an internal error occurs while the storage is
-          handling the store() call.
-        
-        """
-
-    def tpc_abort(transaction):
-        """Abort the transaction.
-
-        Any changes made by the transaction are discarded.
-
-        This call is ignored is the storage is not participating in
-        two-phase commit or if the given transaction is not the same
-        as the transaction the storage is commiting.
-        """
-
-    def tpc_begin(transaction):
-        """Begin the two-phase commit process.
-
-        If storage is already participating in a two-phase commit
-        using the same transaction, the call is ignored.
-
-        If the storage is already participating in a two-phase commit
-        using a different transaction, the call blocks until the
-        current transaction ends (commits or aborts).
-        """
-
-    def tpc_finish(transaction, func = lambda tid: None):
-        """Finish the transaction, making any transaction changes permanent.
-
-        Changes must be made permanent at this point.
-
-        This call is ignored if the storage isn't participating in
-        two-phase commit or if it is committing a different
-        transaction.  Failure of this method is extremely serious.
-
-        The second argument is a call-back function that must be
-        called while the storage transaction lock is held.  It takes
-        the new transaction id generated by the transaction.
-
-        """
-
-    def tpc_vote(transaction):
-        """Provide a storage with an opportunity to veto a transaction
-
-        This call is ignored if the storage isn't participating in
-        two-phase commit or if it is commiting a different
-        transaction.  Failure of this method is extremely serious.
-
-        If a transaction can be committed by a storage, then the
-        method should return.  If a transaction cannot be committed,
-        then an exception should be raised.  If this method returns
-        without an error, then there must not be an error if
-        tpc_finish or tpc_abort is called subsequently.
-
-        The return value can be either None or a sequence of object-id
-        and serial pairs giving new serials for objects who's ids were
-        passed to previous store calls in the same transaction.
-        After the tpc_vote call, new serials must have been returned,
-        either from tpc_vote or store for objects passed to store.
-
-        A serial returned in a sequence of oid/serial pairs, may be
-        the special value ZODB.ConflictResolution.ResolvedSerial to
-        indicate that a conflict occured and that the object should be
-        invalidated.
-
-        """
-
-
-
 class TransactionContext(object):
 
     def __init__(self, txn):
@@ -372,3 +511,55 @@
 
     def __exit__(self, t, v, tb):
         self.cursor.close()
+
+def timetime2tid(timetime):
+    return repr(ZODB.TimeStamp.TimeStamp(
+        *time.gmtime(timetime)[:5]
+        +(time.gmtime(timetime)[5]+divmod(timetime,1)[1],)
+        ))
+
+def DB(path, blob_dir=None, pack=3*86400,
+       read_only=False, create=False,
+       **kw):
+    return ZODB.DB(BSDDBStorage(path, blob_dir, pack, read_only), **kw)
+
+class Records:
+
+    def __init__(self, storage, txn, tid, ext):
+        self.storage = storage
+        self._txn = txn
+        self.tid = tid
+        ext = cPickle.loads(ext)
+        self.user = ext.pop('user', '')
+        self.description = ext.pop('description', '')
+        self.extension = ext
+
+    def __iter__(self):
+        tid = self.tid
+        ntid = n64(tid)
+        with self.storage.cursor(self.storage.transaction_oids, self._txn
+                                 ) as transaction_oids:
+            kv = transaction_oids.get(tid, flags=db.DB_SET)
+            while kv is not None:
+                ttid, oid = kv
+                assert ttid == tid
+                with self.storage.cursor(self.storage.data, self._txn) as data:
+                    doid, rec = data.get(oid, ntid, flags=db.DB_GET_BOTH_RANGE)
+                    assert doid == oid
+                    dntid = n64(rec[:8])
+                    assert dntid == ntid
+                    return Record(oid, tid, rec[8:])
+                kv = transaction_oids.get(tid, flags=db.DB_NEXT_DUP)
+
+class Record:
+    def __init__(self, oid, tid, data):
+        self.oid = oid
+        self.tid = tid
+        self.data = data
+
+    version = ''
+    data_txn = None
+
+    def __repr__(self):
+        return repr((u64(self.oid), str(ZODB.TimeStamp.TimeStamp(self.tid)),
+                     self.data))



More information about the checkins mailing list