[Checkins] SVN: zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py checkpoint
Jim Fulton
jim at zope.com
Fri Nov 13 17:38:58 EST 2009
Log message for revision 105637:
checkpoint
Changed:
U zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py
-=-
Modified: zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py
===================================================================
--- zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py 2009-11-13 21:28:01 UTC (rev 105636)
+++ zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py 2009-11-13 22:38:58 UTC (rev 105637)
@@ -40,7 +40,7 @@
zope.interface.implements(
ZODB.interfaces.IStorage,
ZODB.interfaces.IStorageRestoreable,
-# ZODB.interfaces.IStorageIteration,
+ ZODB.interfaces.IStorageIteration,
# ZODB.interfaces.IStorageCurrentRecordIteration,
ZODB.interfaces.IExternalGC,
)
@@ -120,6 +120,15 @@
self._commit_lock = threading.Lock()
+ # The current lock is used to make sure we consistently order
+ # information about current data for objects. In particular,
+ # we want to avoid the following scenario:
+ # - A thread reads current data via load
+ # - another thread updates data via tpc_finish and sends invalidations
+ # - The first thread's load returns the old data after the
+ # invalidations have been processed.
+ self._current_lock = RWLock()
+
def txn(self, flags=0):
return TransactionContext(self.env.txn_begin(flags=flags))
@@ -173,7 +182,7 @@
while kv:
tid, ext = kv
yield Records(self, txn, tid, ext)
- kv = transactions.get(tid, flags=db.DB_NEXT)
+ kv = transactions.get(tid, flags=db.DB_NEXT)
# def record_iternext(next=None):
# pass # XXX
@@ -187,16 +196,17 @@
return self.data.stat(db.DB_FAST_STAT)['nkeys']
def load(self, oid, version=''):
- with self.txn() as txn:
- with self.cursor(self.data, txn) as cursor:
- kv = cursor.get(oid, db.DB_SET)
- if kv:
- record = kv[1]
- data = record[8:]
- if data:
- return data, n64(record[:8])
+ with self._current_lock.read():
+ with self.txn() as txn:
+ with self.cursor(self.data, txn) as cursor:
+ kv = cursor.get(oid, db.DB_SET)
+ if kv:
+ record = kv[1]
+ data = record[8:]
+ if data:
+ return data, n64(record[:8])
- raise ZODB.POSException.POSKeyError(oid)
+ raise ZODB.POSException.POSKeyError(oid)
def loadBefore(self, oid, tid):
with self.txn(db.DB_TXN_SNAPSHOT) as txn:
@@ -394,6 +404,8 @@
assert not version
if transaction is not self._transaction:
raise ZODB.POSException.StorageTransactionError(self, transaction)
+
+ result = self._tid
committed_tid = self.data.get(oid, dlen=8, doff=0)
if committed_tid is not None:
committed_tid = n64(committed_tid)
@@ -406,8 +418,10 @@
data=data)
else:
data = rdata
+ result = ZODB.ConflictResolution.ResolvedSerial
marshal.dump((oid, n64(self._tid)+data), self._log_file)
+ return result
def restore(self, oid, serial, data, version, prev_txn, transaction):
assert not version
@@ -464,10 +478,12 @@
marshal.dump((tid, ext), self._log_file)
def tpc_finish(self, transaction, func = lambda tid: None):
- self._txn.commit()
- self._txn = self._transaction = None
- self._blob_tpc_finish()
- self._commit_lock.release()
+ with self._current_lock.write():
+ self._txn.commit()
+ func(self._tid)
+ self._txn = self._transaction = None
+ self._blob_tpc_finish()
+ self._commit_lock.release()
_transaction_id_suffix = 'x' * (db.DB_GID_SIZE - 8)
def tpc_vote(self, transaction):
@@ -523,7 +539,7 @@
**kw):
return ZODB.DB(BSDDBStorage(path, blob_dir, pack, read_only), **kw)
-class Records:
+class Records(object):
def __init__(self, storage, txn, tid, ext):
self.storage = storage
@@ -540,15 +556,15 @@
with self.storage.cursor(self.storage.transaction_oids, self._txn
) as transaction_oids:
kv = transaction_oids.get(tid, flags=db.DB_SET)
- while kv is not None:
+ while kv:
ttid, oid = kv
assert ttid == tid
with self.storage.cursor(self.storage.data, self._txn) as data:
doid, rec = data.get(oid, ntid, flags=db.DB_GET_BOTH_RANGE)
assert doid == oid
- dntid = n64(rec[:8])
+ dntid = rec[:8]
assert dntid == ntid
- return Record(oid, tid, rec[8:])
+ yield Record(oid, tid, rec[8:])
kv = transaction_oids.get(tid, flags=db.DB_NEXT_DUP)
class Record:
@@ -563,3 +579,88 @@
def __repr__(self):
return repr((u64(self.oid), str(ZODB.TimeStamp.TimeStamp(self.tid)),
self.data))
+
+
+class RWLock:
+
+ def __init__(self):
+ self.readers = self.write_waiting = 0
+ self.condition = threading.Condition()
+
+ def read(self):
+ return ReadLockContext(self)
+
+ def write(self):
+ return WriteLockContext(self)
+
+ def acquire_write(self):
+ self.condition.acquire()
+ try:
+ if not self.readers:
+ self.readers = -1
+ return
+ self.write_waiting += 1
+ self.condition.wait()
+ while self.readers:
+ self.condition.wait()
+ self.write_waiting -= 1
+ self.readers = -1
+ finally:
+ self.condition.release()
+
+ def release_write(self):
+ self.condition.acquire()
+ try:
+ assert self.readers == -1
+ self.readers = 0
+ if self.write_waiting:
+ self.condition.notifyAll()
+ else:
+ self.condition.notify()
+ finally:
+ self.condition.release()
+
+ def acquire_read(self):
+ self.condition.acquire()
+ try:
+ while self.write_waiting or self.readers < 0:
+ self.condition.wait()
+ self.readers += 1
+ finally:
+ self.condition.release()
+
+ def release_read(self):
+ self.condition.acquire()
+ try:
+ assert self.readers > 0
+ self.readers -= 1
+ if self.readers == 0:
+ if self.write_waiting:
+ self.condition.notifyAll()
+ else:
+ self.condition.notify()
+ finally:
+ self.condition.release()
+
+class WriteLockContext(object):
+
+ def __init__(self, lock):
+ self.lock = lock
+
+ def __enter__(self):
+ self.lock.acquire_write()
+
+ def __exit__(self, *args):
+ self.lock.release_write()
+
+class ReadLockContext(object):
+
+ def __init__(self, lock):
+ self.lock = lock
+
+ def __enter__(self):
+ self.lock.acquire_read()
+
+ def __exit__(self, *args):
+ self.lock.release_read()
+
More information about the checkins
mailing list