[Checkins] SVN: zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py checkpoint

Jim Fulton jim at zope.com
Fri Nov 13 17:38:58 EST 2009


Log message for revision 105637:
  checkpoint

Changed:
  U   zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py

-=-
Modified: zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py
===================================================================
--- zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py	2009-11-13 21:28:01 UTC (rev 105636)
+++ zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py	2009-11-13 22:38:58 UTC (rev 105637)
@@ -40,7 +40,7 @@
     zope.interface.implements(
         ZODB.interfaces.IStorage,
         ZODB.interfaces.IStorageRestoreable,
-#         ZODB.interfaces.IStorageIteration,
+        ZODB.interfaces.IStorageIteration,
 #         ZODB.interfaces.IStorageCurrentRecordIteration,
         ZODB.interfaces.IExternalGC,
         )
@@ -120,6 +120,15 @@
 
         self._commit_lock = threading.Lock()
 
+        # The current lock is used to make sure we consistently order
+        # information about current data for objects.  In particular,
+        # we want to avoid the following scenario:
+        # - A thread reads current data via load
+        # - another thread updates data via tpc_finish and sends invalidations
+        # - The first thread's load returns the old data after the
+        #   invalidations have been processed.
+        self._current_lock = RWLock() 
+
     def txn(self, flags=0):
         return TransactionContext(self.env.txn_begin(flags=flags))
 
@@ -173,7 +182,7 @@
                 while kv:
                     tid, ext = kv
                     yield Records(self, txn, tid, ext)
-                kv = transactions.get(tid, flags=db.DB_NEXT)
+                    kv = transactions.get(tid, flags=db.DB_NEXT)
 
 #     def record_iternext(next=None):
 #         pass # XXX
@@ -187,16 +196,17 @@
         return self.data.stat(db.DB_FAST_STAT)['nkeys']
 
     def load(self, oid, version=''):
-        with self.txn() as txn:
-            with self.cursor(self.data, txn) as cursor:
-                kv = cursor.get(oid, db.DB_SET)
-                if kv:
-                    record = kv[1]
-                    data = record[8:]
-                    if data:
-                        return data, n64(record[:8])
+        with self._current_lock.read():
+            with self.txn() as txn:
+                with self.cursor(self.data, txn) as cursor:
+                    kv = cursor.get(oid, db.DB_SET)
+                    if kv:
+                        record = kv[1]
+                        data = record[8:]
+                        if data:
+                            return data, n64(record[:8])
 
-                raise ZODB.POSException.POSKeyError(oid)
+                    raise ZODB.POSException.POSKeyError(oid)
 
     def loadBefore(self, oid, tid):
         with self.txn(db.DB_TXN_SNAPSHOT) as txn:
@@ -394,6 +404,8 @@
         assert not version
         if transaction is not self._transaction:
             raise ZODB.POSException.StorageTransactionError(self, transaction)
+
+        result = self._tid
         committed_tid = self.data.get(oid, dlen=8, doff=0)
         if committed_tid is not None:
             committed_tid = n64(committed_tid)
@@ -406,8 +418,10 @@
                         data=data)
                 else:
                     data = rdata
+                    result = ZODB.ConflictResolution.ResolvedSerial
 
         marshal.dump((oid, n64(self._tid)+data), self._log_file)
+        return result
 
     def restore(self, oid, serial, data, version, prev_txn, transaction):
         assert not version
@@ -464,10 +478,12 @@
         marshal.dump((tid, ext), self._log_file)
 
     def tpc_finish(self, transaction, func = lambda tid: None):
-        self._txn.commit()
-        self._txn = self._transaction = None
-        self._blob_tpc_finish()
-        self._commit_lock.release()
+        with self._current_lock.write():
+            self._txn.commit()
+            func(self._tid)
+            self._txn = self._transaction = None
+            self._blob_tpc_finish()
+            self._commit_lock.release()
 
     _transaction_id_suffix = 'x' * (db.DB_GID_SIZE - 8)
     def tpc_vote(self, transaction):
@@ -523,7 +539,7 @@
        **kw):
     return ZODB.DB(BSDDBStorage(path, blob_dir, pack, read_only), **kw)
 
-class Records:
+class Records(object):
 
     def __init__(self, storage, txn, tid, ext):
         self.storage = storage
@@ -540,15 +556,15 @@
         with self.storage.cursor(self.storage.transaction_oids, self._txn
                                  ) as transaction_oids:
             kv = transaction_oids.get(tid, flags=db.DB_SET)
-            while kv is not None:
+            while kv:
                 ttid, oid = kv
                 assert ttid == tid
                 with self.storage.cursor(self.storage.data, self._txn) as data:
                     doid, rec = data.get(oid, ntid, flags=db.DB_GET_BOTH_RANGE)
                     assert doid == oid
-                    dntid = n64(rec[:8])
+                    dntid = rec[:8]
                     assert dntid == ntid
-                    return Record(oid, tid, rec[8:])
+                    yield Record(oid, tid, rec[8:])
                 kv = transaction_oids.get(tid, flags=db.DB_NEXT_DUP)
 
 class Record:
@@ -563,3 +579,88 @@
     def __repr__(self):
         return repr((u64(self.oid), str(ZODB.TimeStamp.TimeStamp(self.tid)),
                      self.data))
+
+
+class RWLock:
+
+    def __init__(self):
+        self.readers = self.write_waiting = 0
+        self.condition = threading.Condition()
+
+    def read(self):
+        return ReadLockContext(self)
+
+    def write(self):
+        return WriteLockContext(self)
+
+    def acquire_write(self):
+        self.condition.acquire()
+        try:
+            if not self.readers:
+                self.readers = -1
+                return
+            self.write_waiting += 1
+            self.condition.wait()
+            while self.readers:
+                self.condition.wait()
+            self.write_waiting -= 1
+            self.readers = -1
+        finally:
+            self.condition.release()
+
+    def release_write(self):
+        self.condition.acquire()
+        try:
+            assert self.readers == -1
+            self.readers = 0
+            if self.write_waiting:
+                self.condition.notifyAll()
+            else:
+                self.condition.notify()
+        finally:
+            self.condition.release()
+
+    def acquire_read(self):
+        self.condition.acquire()
+        try:
+            while self.write_waiting or self.readers < 0:
+                self.condition.wait()
+            self.readers += 1
+        finally:
+            self.condition.release()
+
+    def release_read(self):
+        self.condition.acquire()
+        try:
+            assert self.readers > 0
+            self.readers -= 1
+            if self.readers == 0:
+                if self.write_waiting:
+                    self.condition.notifyAll()
+                else:
+                    self.condition.notify()
+        finally:
+            self.condition.release()
+
+class WriteLockContext(object):
+
+    def __init__(self, lock):
+        self.lock = lock
+
+    def __enter__(self):
+        self.lock.acquire_write()
+
+    def __exit__(self, *args):
+        self.lock.release_write()
+
+class ReadLockContext(object):
+
+    def __init__(self, lock):
+        self.lock = lock
+
+    def __enter__(self):
+        self.lock.acquire_read()
+
+    def __exit__(self, *args):
+        self.lock.release_read()
+        



More information about the checkins mailing list