[Checkins] SVN: zc.bsddbstorage/branches/dev/s Checkpoint
Jim Fulton
jim at zope.com
Fri Nov 13 13:04:23 EST 2009
Log message for revision 105629:
Checkpoint
Changed:
U zc.bsddbstorage/branches/dev/setup.py
U zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py
-=-
Modified: zc.bsddbstorage/branches/dev/setup.py
===================================================================
--- zc.bsddbstorage/branches/dev/setup.py 2009-11-13 16:54:33 UTC (rev 105628)
+++ zc.bsddbstorage/branches/dev/setup.py 2009-11-13 18:04:22 UTC (rev 105629)
@@ -13,7 +13,7 @@
##############################################################################
name, version = 'zc.bsddbstorage', '0'
-install_requires = ['setuptools', 'bsddb3']
+install_requires = ['setuptools', 'bsddb3', 'ZODB3']
extras_require = dict(test=['zope.testing'])
entry_points = """
Modified: zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py
===================================================================
--- zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py 2009-11-13 16:54:33 UTC (rev 105628)
+++ zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py 2009-11-13 18:04:22 UTC (rev 105629)
@@ -13,18 +13,67 @@
##############################################################################
from bsddb3 import db
+from ZODB.utils import p64, u64, z64
+import cPickle
+import marshal
import os
+import tempfile
+import threading
+import time
+import zc.lockfile
+import ZODB
+import ZODB.blob
+import ZODB.ConflictResolution
+import ZODB.interfaces
import ZODB.POSException
import ZODB.TimeStamp
+import zope.interface
+def n64(tid):
+ return p64(868082074056920076L-u64(tid))
-class BSDDBStorage:
+class BSDDBStorage(
+ ZODB.blob.BlobStorageMixin,
+ ZODB.ConflictResolution.ConflictResolvingStorage,
+ ):
- def __init__(self, envpath):
+ zope.interface.implements(
+ ZODB.interfaces.IStorage,
+ ZODB.interfaces.IStorageRestoreable,
+# ZODB.interfaces.IStorageIteration,
+# ZODB.interfaces.IStorageCurrentRecordIteration,
+ ZODB.interfaces.IExternalGC,
+ )
+
+ def __init__(self, envpath, blob_dir=None, pack=3*86400,
+ create=False, read_only=False):
self.__name__ = envpath
+ envpath = os.path.abspath(envpath)
+ if create:
+ if os.path.isdir(envpath):
+ ZODB.blob.remove_committed_dir(envpath)
+ if blob_dir and os.path.isdir(blob_dir):
+ ZODB.blob.remove_committed_dir(blob_dir)
+ self._pack = pack
+ self._read_only = read_only
if not os.path.isdir(envpath):
os.mkdir(envpath)
+ if not read_only:
+ # Create the lock file
+ self._lock_file = zc.lockfile.LockFile(
+ os.path.join(envpath, 'zodb.lock'))
+
+ if blob_dir:
+ blob_dir = os.path.abspath(blob_dir)
+ self._blob_init(blob_dir)
+ zope.interface.alsoProvides(self,
+ ZODB.interfaces.IBlobStorageRestoreable)
+ else:
+ self.blob_dir = None
+ self._blob_init_no_blobs()
+
+
self.env = db.DBEnv()
self.env.open(envpath,
db.DB_INIT_LOCK | db.DB_INIT_LOG | db.DB_INIT_MPOOL |
@@ -33,7 +82,7 @@
# data: {oid -> [tid+data]}
self.data = db.DB(self.env)
- self.data.set_flags(db.DB_DUP)
+ self.data.set_flags(db.DB_DUPSORT)
self.data.open('data', dbtype=db.DB_HASH,
flags=(db.DB_CREATE | db.DB_THREAD | db.DB_AUTO_COMMIT |
db.DB_MULTIVERSION),
@@ -42,7 +91,7 @@
# transaction_oids: {tid->[oids]}
self.transaction_oids = db.DB(self.env)
- self.transaction_oids.set_flags(db.DB_DUP)
+ self.transaction_oids.set_flags(db.DB_DUPSORT)
self.transaction_oids.open('transaction_oids', dbtype=db.DB_BTREE,
flags=(db.DB_CREATE | db.DB_THREAD |
db.DB_AUTO_COMMIT |
@@ -51,56 +100,87 @@
# transactions: {tid ->transaction_pickle}
self.transactions = db.DB(self.env)
- self.transaction.open('transactions', dbtype=db.DB_BTREE,
- flags=(db.DB_CREATE | db.DB_THREAD |
- db.DB_AUTO_COMMIT | db.DB_MULTIVERSION),
- )
+ self.transactions.open('transactions', dbtype=db.DB_BTREE,
+ flags=(db.DB_CREATE | db.DB_THREAD |
+ db.DB_AUTO_COMMIT | db.DB_MULTIVERSION),
+ )
+ # Misc info:
+ # pack-trans
+ self.misc = db.DB(self.env)
+ self.misc.open('misc', dbtype=db.DB_HASH,
+ flags=(db.DB_CREATE | db.DB_THREAD | db.DB_AUTO_COMMIT |
+ db.DB_MULTIVERSION),
+ )
+
+ t = time.time()
+ t = self._ts = ZODB.TimeStamp.TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
+ self._tid = repr(t)
+ self._transaction = None
+
+ self._commit_lock = threading.Lock()
+
def txn(self, flags=0):
return TransactionContext(self.env.txn_begin(flags=flags))
- def cursor(self, db, txn=None, flags=0):
- return CursorContext(self.db.cursor(txn, flags))
+ def cursor(self, database, txn=None, flags=0):
+ return CursorContext(database.cursor(txn, flags))
def close(self):
self.data.close()
self.transaction_oids.close()
self.transactions.close()
+ self.env.close()
+ if not self._read_only:
+ self._lock_file.close()
def getName(self):
return self.__name__
def getSize(self):
- return os.stat(self.datapath)
+ return os.stat(self.datapath).st_size
def _history_entry(self, record, txn):
- tid = record[:8]
+ tid = n64(record[:8])
transaction = cPickle.loads(self.transactions.get(tid, txn=txn))
transaction.update(size=len(record-8))
def history(self, oid, size=1):
with self.txn(db.DB_TXN_SNAPSHOT) as txn:
with self.cursor(self.data, txn) as cursor:
- k, record = cursor.get(oid, db.DB_PREV)
- if k != oid or len(record) == 8:
+ kv = cursor.get(oid, db.DB_SET)
+ if kv is None:
raise ZODB.POSException.POSKeyError(oid)
-
- result = [_history_entry(record)]
- while len(result) < size):
- kv = cursor.get(oid, db.PREV_DUP)
+ k, record = kv
+ if len(record) == 8:
+ raise ZODB.POSException.POSKeyError(oid)
+ result = [self._history_entry(record, txn)]
+ while len(result) < size:
+ kv = cursor.get(oid, db.DB_NEXT_DUP)
if kv is None:
break
- result.append(_history_entry(kb[1])
+ result.append(self._history_entry(kv[1], txn))
- cursor.close()
- return result
+ return result
def isReadOnly(self):
- return False
+ return self._read_only
+ def iterator(self, start=z64, stop=None):
+ with self.txn(db.DB_READ_COMMITTED) as txn:
+ with self.cursor(self.transactions, txn) as transactions:
+ kv = transactions.get(start, flags=db.DB_SET_RANGE)
+ while kv:
+ tid, ext = kv
+ yield Records(self, txn, tid, ext)
+ kv = transactions.get(tid, flags=db.DB_NEXT)
+
+# def record_iternext(next=None):
+# pass # XXX
+
def lastTransaction(self):
with self.txn() as txn:
- with self.cursor(self.data, txn) as cursor:
+ with self.cursor(self.transactions, txn) as cursor:
return cursor.get(db.DB_LAST)[0]
def __len__(self):
@@ -109,245 +189,304 @@
def load(self, oid, version=''):
with self.txn() as txn:
with self.cursor(self.data, txn) as cursor:
- k, record = cursor.get(oid, db.DB_PREV)
- if k != oid or len(record) == 8:
- raise ZODB.POSException.POSKeyError(oid)
- return result[8:], result[:8]
+ kv = cursor.get(oid, db.DB_SET)
+ if kv:
+ record = kv[1]
+ data = record[8:]
+ if data:
+ return data, n64(record[:8])
+ raise ZODB.POSException.POSKeyError(oid)
def loadBefore(self, oid, tid):
with self.txn(db.DB_TXN_SNAPSHOT) as txn:
with self.cursor(self.data, txn) as cursor:
- k, record = cursor.get(oid, db.DB_PREV)
- if k != oid or len(record) == 8:
+ kr = cursor.get(oid, db.DB_SET)
+ if kr is None:
raise ZODB.POSException.POSKeyError(oid)
+ record = kr[1]
+ if kr[0] != oid or len(record) == 8:
+ raise ZODB.POSException.POSKeyError(oid)
nexttid = None
- rtid = record[:8]
+ rtid = n64(record[:8])
while rtid >= tid:
- krecord = cursor.get(oid, db.PREV_DUP)
+ krecord = cursor.get(oid, flags=db.DB_NEXT_DUP)
if krecord is None:
return None
nexttid = rtid
record = krecord[1]
- rtid = record[:8]
+ rtid = n64(record[:8])
return record[8:], rtid, nexttid
- def loadSerial(oid, serial):
+ def loadSerial(self, oid, serial):
+ serial = n64(serial)
with self.txn(db.DB_TXN_SNAPSHOT) as txn:
with self.cursor(self.data, txn) as cursor:
- k, record = cursor.get(oid, db.DB_PREV)
- if k != oid or len(record) == 8:
- raise ZODB.POSException.POSKeyError(oid)
- nexttid = None
- rtid = record[:8]
- while rtid >= tid:
- krecord = cursor.get(oid, db.PREV_DUP)
- if krecord is None:
- return None
- nexttid = rtid
- record = krecord[1]
- rtid = record[:8]
+ kr = cursor.get(oid, serial, flags=db.DB_GET_BOTH_RANGE)
+ if kr:
+ k, record = kr
+ if k == oid and record[:8] == serial:
+ data = record[8:]
+ if data:
+ return data
- return record[8:], rtid, nexttid
+ raise ZODB.POSException.POSKeyError(oid, serial)
+ def new_oid(self):
+ with self.txn() as txn:
+ oid = p64(u64(self.misc.get('oid', z64, txn))+1)
+ self.misc.put('oid', oid, txn)
+ return oid
+ def pack(self, pack_time=None, referencesf=None):
+ if pack_time is None:
+ pack_time = time.time()-self._pack
+ pack_tid = timetime2tid(pack_time)
+ while self._pack1(pack_tid):
+ pass
+ self._remove_empty_notlast_blob_directories(self.blob_dir)
- """Load the object record for the give transaction id
+ def _pack1(self, pack_tid):
+ # Pack one transaction. Get the next transaction we haven't yet
+ # packed and stop if it is > pack_tid.
+ # This is done as a transaction.
+ removed_blobs = []
+ with self.txn(db.DB_TXN_SNAPSHOT) as txn:
+ # Pick a tid just past the last one we packed:
+ tid = p64(u64(self.misc.get('pack', z64, txn=txn))+1)
+ if tid > pack_tid:
+ return None
+ with self.cursor(self.transaction_oids, txn) as transaction_oids:
+ # Find the smallest tid >= the one we picked
+ kv = transaction_oids.get(tid, flags=db.DB_SET_RANGE)
+ if kv is None:
+ return None
- If a matching data record can be found, it is returned,
- otherwise, POSKeyError is raised.
- """
+ tid, oid = kv
+ ntid = n64(tid)
-# The following two methods are effectively part of the interface,
-# as they are generally needed when one storage wraps
-# another. This deserves some thought, at probably debate, before
-# adding them.
-#
-# def _lock_acquire():
-# """Acquire the storage lock
-# """
+ # Iterate ober the oids for this tid and pack each one.
+ # Note that we treat the tid we're looking at as the
+ # pack time. That is, as we look at each transaction,
+ # we pack to that time. This way, we can pack
+ # *very* incrementally.
+ while 1:
-# def _lock_release():
-# """Release the storage lock
-# """
+ # Find the first record for the oid whos tid is <=
+ # the pack time. (we use negative tids, so >=)
+ # This is the current record as of the pack time
+ with self.cursor(self.data, txn) as data:
+ doid, record = data.get(oid, ntid,
+ flags=db.DB_GET_BOTH_RANGE)
+ assert doid == oid
+ ndtid = record[:8]
+ assert ndtid >= ntid
+ if len(record) == 8:
+ # delete record, so we can delete the record,
+ # which with the deletions below, will
+ # delete the oid
+ data.delete()
+ deleted_oid = True
+ else:
+ deleted_oid = False
- def new_oid():
- """Allocate a new object id.
+ # OK, we have the current record as of the tid,
+ # we can remove later ones
- The object id returned is reserved at least as long as the
- storage is opened.
+ while 1:
+ kv = data.get(oid, ntid, flags=db.DB_NEXT_DUP)
+ if kv is None:
+ break
+ doid, record = kv
+ assert doid == oid
+ data.delete()
+ ndtid = record[:8]
+ dtid = n64(ndtid)
+ pickle = record[8:]
+ if (self.blob_dir and
+ ZODB.blob.is_blob_record(pickle)
+ ):
+ if deleted_oid:
+ if ((not removed_blobs) or
+ (removed_blobs[-1] != oid)):
+ removed_blobs.append(oid)
+ else:
+ removed_blobs.append(oid+dtid)
+ # clean up transaction_oids and
+ # maybe transactions
+ self._pack_remove_oid_tid(dtid, oid, txn)
- The return value is a string.
- """
+ # continue iterating over the oids for this tid
+ kv = self.transaction_oids.get(tid, flags=db.DB_NEXT_DUP)
+ if kv is None:
+ break
+ assert kv[0] == tid
+ oid = kv[1]
- def pack(pack_time, referencesf):
- """Pack the storage
+ self.misc.put('pack', tid, txn=txn)
- It is up to the storage to interpret this call, however, the
- general idea is that the storage free space by:
+ if removed_blobs:
+ self._remove_blob_files_tagged_for_removal_during_pack(
+ removed_blobs)
- - discarding object revisions that were old and not current as of the
- given pack time.
+ return tid
- - garbage collecting objects that aren't reachable from the
- root object via revisions remaining after discarding
- revisions that were not current as of the pack time.
+ def _pack_remove_oid_tid(self, tid, oid, txn):
+ with self.cursor(self.transaction_oids, txn) as transaction_oids:
+ toid, ttid = transaction_oids.get(oid, tid,
+ flags=db.DB_GET_BOTH_RANGE)
+ if toid != oid or ttid != tid:
+ raise AssertionError("Bad oid+tid lookup",
+ oid, tid, toid, ttid)
+ transaction_oids.delete()
+ # OK, we deleted the record. Maybe it was the last one. Try to get
+ # the first, and, if we can't, then delete the transaction record.
+ kv = transaction_oids.get(tid, flags=db.DB_SET)
+ if kv is None:
+ # OK, no more oids for this tid, remive it from transactions
+ self.transactions.delete(tid, txn)
- The pack time is given as a UTC time in seconds since the
- epoch.
+ def _remove_blob_files_tagged_for_removal_during_pack(self, removed):
+ for oid in removed:
+ if len(oid) == 8:
+ # oid is garbage, re/move dir
+ path = self.fshelper.getPathForOID(oid)
+ if os.path.exists(path):
+ ZODB.blob.remove_committed_dir(path)
+ else:
+ tid = oid[8:]
+ oid = oid[:8]
+ path = self.fshelper.getBlobFilename(oid, tid)
+ if os.path.exists(path):
+ ZODB.blob.remove_committed(path)
- The second argument is a function that should be used to
- extract object references from database records. This is
- needed to determine which objects are referenced from object
- revisions.
- """
+ def _remove_empty_notlast_blob_directories(self, dir):
+ # clean up empty blob dirs. This relies on the
+ # fact that oids are allocates sequentially
+ paths = filter(os.path.isdir,
+ [os.path.join(dir, name)
+ for name in sorted(os.listdor(dir))
+ if name.lower().startswith('0x')])
+ if not paths:
+ return
+ self._remove_empty_notlast_blob_directories(paths.pop())
+ for path in paths:
+ self._remove_empty_recursively_directory(path)
- def registerDB(db):
- """Register an IStorageDB.
+ def _remove_empty_recursively_directory(self, dir):
+ if not os.path.isdir(dir):
+ return False
+ for name in os.listdir(dir):
+ if not self._remove_empty_recursively_directory(
+ os.path.join(dir, name)):
+ return False
+ os.rmdir(dir)
+ return True
- Note that, for historical reasons, an implementation may
- require a second argument, however, if required, the None will
- be passed as the second argument.
- """
+ def registerDB(self, db):
+ pass
- def sortKey():
- """Sort key used to order distributed transactions
+ def sortKey(self):
+ return self.__name__
- When a transaction involved multiple storages, 2-phase commit
- operations are applied in sort-key order. This must be unique
- among storages used in a transaction. Obviously, the storage
- can't assure this, but it should construct the sort key so it
- has a reasonable chance of being unique.
+ def store(self, oid, oldserial, data, version, transaction):
+ assert not version
+ if transaction is not self._transaction:
+ raise ZODB.POSException.StorageTransactionError(self, transaction)
+ committed_tid = self.data.get(oid, dlen=8, doff=0)
+ if committed_tid is not None:
+ committed_tid = n64(committed_tid)
+ if committed_tid != oldserial:
+ rdata = self.tryToResolveConflict(oid, committed_tid,
+ oldserial, data)
+ if rdata is None:
+ raise ZODB.POSException.ConflictError(
+ oid=oid, serials=(committed_tid, oldserial),
+ data=data)
+ else:
+ data = rdata
- The result must be a string.
- """
+ marshal.dump((oid, n64(self._tid)+data), self._log_file)
- def store(oid, serial, data, version, transaction):
- """Store data for the object id, oid.
+ def restore(self, oid, serial, data, version, prev_txn, transaction):
+ assert not version
+ if transaction is not self._transaction:
+ raise ZODB.POSException.StorageTransactionError(self, transaction)
- Arguments:
+ marshal.dump((oid, n64(serial)+data), self._log_file)
- oid
- The object identifier. This is either a string
- consisting of 8 nulls or a string previously returned by
- new_oid.
+ def deleteObject(self, oid, oldserial, transaction):
+ if transaction is not self._transaction:
+ raise ZODB.POSException.StorageTransactionError(self, transaction)
+ committed_tid = self.data.get(oid, dlen=8, doff=0)
+ if committed_tid is not None and committed_tid != oldserial:
+ raise ZODB.POSException.ConflictError(
+ oid=oid, serials=(committed_tid, oldserial))
- serial
- The serial of the data that was read when the object was
- loaded from the database. If the object was created in
- the current transaction this will be a string consisting
- of 8 nulls.
+ marshal.dump((oid, n64(self._tid)), self._log_file)
- data
- The data record. This is opaque to the storage.
+ def tpc_abort(self, transaction):
+ self._txn.abort()
+ self._txn = self._transaction = None
+ self._blob_tpc_abort()
+ self._commit_lock.release()
- version
- This must be an empty string. It exists for backward compatibility.
+ def tpc_begin(self, transaction, tid=None, status=' '):
+ if self._read_only:
+ raise ZODB.POSException.ReadOnlyError()
- transaction
- A transaction object. This should match the current
- transaction for the storage, set by tpc_begin.
+ self._commit_lock.acquire()
+ if self._transaction is not None and transaction != self._transaction:
+ self._commit_lock.release()
+ raise ZODB.POSException.StorageTransactionError(self, transaction)
- The new serial for the object is returned, but not necessarily
- immediately. It may be returned directly, or on a subsequent
- store or tpc_vote call.
+ self._transaction = transaction
- The return value may be:
+ ext = transaction._extension.copy()
+ ext['user'] = transaction.user
+ ext['description'] = transaction.description
+ ext = cPickle.dumps(ext, 1)
- - None
+ if tid is None:
+ now = time.time()
+ t = ZODB.TimeStamp.TimeStamp(
+ *(time.gmtime(now)[:5] + (now % 60,)))
+ self._ts = t = t.laterThan(self._ts)
+ self._tid = tid = repr(t)
+ else:
+ self._ts = ZODB.TimeStamp.TimeStamp(tid)
+ self._tid = tid
- - A new serial (string) for the object, or
+ fd, path = tempfile.mkstemp('bsddb')
+ self._log_file = open(path, 'r+b')
+ os.close(fd)
+ marshal.dump((tid, ext), self._log_file)
- - An iterable of object-id and serial pairs giving new serials
- for objects.
+ def tpc_finish(self, transaction, func = lambda tid: None):
+ self._txn.commit()
+ self._txn = self._transaction = None
+ self._blob_tpc_finish()
+ self._commit_lock.release()
- A serial, returned as a string or in a sequence of oid/serial
- pairs, may be the special value
- ZODB.ConflictResolution.ResolvedSerial to indicate that a
- conflict occured and that the object should be invalidated.
+ _transaction_id_suffix = 'x' * (db.DB_GID_SIZE - 8)
+ def tpc_vote(self, transaction):
+ self._txn = txn = self.env.txn_begin()
+ self._log_file.seek(0)
+ tid, ext = marshal.load(self._log_file)
+ self.transactions.put(tid, ext, txn=txn)
+ for oid, record in marhal_iterate(self._log_file):
+ self.data.put(oid, record, txn=txn)
+ self.transaction_oids.put(tid, oid, txn=txn)
+ txn.prepare(self._tid+self._transaction_id_suffix)
- Several different exceptions may be raised when an error occurs.
+def marhal_iterate(f):
+ while 1:
+ try:
+ yield marshal.load(f)
+ except EOFError:
+ break
- ConflictError
- is raised when serial does not match the most recent serial
- number for object oid and the conflict was not resolved by
- the storage.
-
- StorageTransactionError
- is raised when transaction does not match the current
- transaction.
-
- StorageError or, more often, a subclass of it
- is raised when an internal error occurs while the storage is
- handling the store() call.
-
- """
-
- def tpc_abort(transaction):
- """Abort the transaction.
-
- Any changes made by the transaction are discarded.
-
- This call is ignored is the storage is not participating in
- two-phase commit or if the given transaction is not the same
- as the transaction the storage is commiting.
- """
-
- def tpc_begin(transaction):
- """Begin the two-phase commit process.
-
- If storage is already participating in a two-phase commit
- using the same transaction, the call is ignored.
-
- If the storage is already participating in a two-phase commit
- using a different transaction, the call blocks until the
- current transaction ends (commits or aborts).
- """
-
- def tpc_finish(transaction, func = lambda tid: None):
- """Finish the transaction, making any transaction changes permanent.
-
- Changes must be made permanent at this point.
-
- This call is ignored if the storage isn't participating in
- two-phase commit or if it is committing a different
- transaction. Failure of this method is extremely serious.
-
- The second argument is a call-back function that must be
- called while the storage transaction lock is held. It takes
- the new transaction id generated by the transaction.
-
- """
-
- def tpc_vote(transaction):
- """Provide a storage with an opportunity to veto a transaction
-
- This call is ignored if the storage isn't participating in
- two-phase commit or if it is commiting a different
- transaction. Failure of this method is extremely serious.
-
- If a transaction can be committed by a storage, then the
- method should return. If a transaction cannot be committed,
- then an exception should be raised. If this method returns
- without an error, then there must not be an error if
- tpc_finish or tpc_abort is called subsequently.
-
- The return value can be either None or a sequence of object-id
- and serial pairs giving new serials for objects who's ids were
- passed to previous store calls in the same transaction.
- After the tpc_vote call, new serials must have been returned,
- either from tpc_vote or store for objects passed to store.
-
- A serial returned in a sequence of oid/serial pairs, may be
- the special value ZODB.ConflictResolution.ResolvedSerial to
- indicate that a conflict occured and that the object should be
- invalidated.
-
- """
-
-
-
class TransactionContext(object):
def __init__(self, txn):
@@ -372,3 +511,55 @@
def __exit__(self, t, v, tb):
self.cursor.close()
+
+def timetime2tid(timetime):
+ return repr(ZODB.TimeStamp.TimeStamp(
+ *time.gmtime(timetime)[:5]
+ +(time.gmtime(timetime)[5]+divmod(timetime,1)[1],)
+ ))
+
+def DB(path, blob_dir=None, pack=3*86400,
+ read_only=False, create=False,
+ **kw):
+ return ZODB.DB(BSDDBStorage(path, blob_dir, pack, read_only), **kw)
+
+class Records:
+
+ def __init__(self, storage, txn, tid, ext):
+ self.storage = storage
+ self._txn = txn
+ self.tid = tid
+ ext = cPickle.loads(ext)
+ self.user = ext.pop('user', '')
+ self.description = ext.pop('description', '')
+ self.extension = ext
+
+ def __iter__(self):
+ tid = self.tid
+ ntid = n64(tid)
+ with self.storage.cursor(self.storage.transaction_oids, self._txn
+ ) as transaction_oids:
+ kv = transaction_oids.get(tid, flags=db.DB_SET)
+ while kv is not None:
+ ttid, oid = kv
+ assert ttid == tid
+ with self.storage.cursor(self.storage.data, self._txn) as data:
+ doid, rec = data.get(oid, ntid, flags=db.DB_GET_BOTH_RANGE)
+ assert doid == oid
+ dntid = n64(rec[:8])
+ assert dntid == ntid
+ return Record(oid, tid, rec[8:])
+ kv = transaction_oids.get(tid, flags=db.DB_NEXT_DUP)
+
+class Record:
+ def __init__(self, oid, tid, data):
+ self.oid = oid
+ self.tid = tid
+ self.data = data
+
+ version = ''
+ data_txn = None
+
+ def __repr__(self):
+ return repr((u64(self.oid), str(ZODB.TimeStamp.TimeStamp(self.tid)),
+ self.data))
More information about the checkins
mailing list