[Checkins] SVN: ZODB/trunk/src/ZODB/ Implemented checkCurrentSerialInTransaction for the basic storages.

Jim Fulton jim at zope.com
Thu Sep 2 09:55:27 EDT 2010


Log message for revision 116132:
  Implemented checkCurrentSerialInTransaction for the basic storages.
  

Changed:
  U   ZODB/trunk/src/ZODB/BaseStorage.py
  U   ZODB/trunk/src/ZODB/DemoStorage.py
  U   ZODB/trunk/src/ZODB/MappingStorage.py
  U   ZODB/trunk/src/ZODB/tests/BasicStorage.py
  U   ZODB/trunk/src/ZODB/tests/test_storage.py

-=-
Modified: ZODB/trunk/src/ZODB/BaseStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/BaseStorage.py	2010-09-02 13:55:24 UTC (rev 116131)
+++ ZODB/trunk/src/ZODB/BaseStorage.py	2010-09-02 13:55:26 UTC (rev 116132)
@@ -399,6 +399,19 @@
         dest.tpc_finish(transaction)
 
 
+# defined outside of BaseStorage to facilitate independent reuse.
+# just depends on _transaction attr and getTid method.
+def checkCurrentSerialInTransaction(self, oid, serial, transaction):
+    if transaction is not self._transaction:
+        raise POSException.StorageTransactionError(self, transaction)
+
+    committed_tid = self.getTid(oid)
+    if committed_tid != serial:
+        raise POSException.ReadConflictError(
+            oid=oid, serials=(committed_tid, serial))
+
+BaseStorage.checkCurrentSerialInTransaction = checkCurrentSerialInTransaction
+
 class TransactionRecord(object):
     """Abstract base class for iterator protocol"""
 

Modified: ZODB/trunk/src/ZODB/DemoStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/DemoStorage.py	2010-09-02 13:55:24 UTC (rev 116131)
+++ ZODB/trunk/src/ZODB/DemoStorage.py	2010-09-02 13:55:26 UTC (rev 116132)
@@ -24,6 +24,7 @@
 import weakref
 import tempfile
 import threading
+import ZODB.BaseStorage
 import ZODB.blob
 import ZODB.interfaces
 import ZODB.MappingStorage
@@ -304,6 +305,9 @@
                     oid, oldserial, data, blobfilename, '', transaction)
             raise
 
+    checkCurrentSerialInTransaction = (
+        ZODB.BaseStorage.checkCurrentSerialInTransaction)
+
     def temporaryDirectory(self):
         try:
             return self.changes.temporaryDirectory()

Modified: ZODB/trunk/src/ZODB/MappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/MappingStorage.py	2010-09-02 13:55:24 UTC (rev 116131)
+++ ZODB/trunk/src/ZODB/MappingStorage.py	2010-09-02 13:55:26 UTC (rev 116132)
@@ -21,6 +21,7 @@
 import cPickle
 import time
 import threading
+import ZODB.BaseStorage
 import ZODB.interfaces
 import ZODB.POSException
 import ZODB.TimeStamp
@@ -261,6 +262,9 @@
 
         return self._tid
 
+    checkCurrentSerialInTransaction = (
+        ZODB.BaseStorage.checkCurrentSerialInTransaction)
+
     # ZODB.interfaces.IStorage
     @ZODB.utils.locked(opened)
     def tpc_abort(self, transaction):

Modified: ZODB/trunk/src/ZODB/tests/BasicStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/BasicStorage.py	2010-09-02 13:55:24 UTC (rev 116131)
+++ ZODB/trunk/src/ZODB/tests/BasicStorage.py	2010-09-02 13:55:26 UTC (rev 116132)
@@ -199,3 +199,89 @@
         self._storage.tpc_vote(t)
         self._storage.tpc_finish(t)
         t.commit()
+
+    def check_checkCurrentSerialInTransaction(self):
+        oid = '\0\0\0\0\0\0\0\xf0'
+        tid = self._dostore(oid)
+        tid2 = self._dostore(oid, revid=tid)
+
+
+        # stale read
+        transaction.begin()
+        t = transaction.get()
+        self._storage.tpc_begin(t)
+        try:
+            self._storage.store('\0\0\0\0\0\0\0\xf1',
+                                '\0\0\0\0\0\0\0\0', 'x', '', t)
+            self._storage.checkCurrentSerialInTransaction(oid, tid, t)
+            self._storage.tpc_vote(t)
+        except POSException.ReadConflictError, v:
+            self.assert_(v.oid) == oid
+            self.assert_(v.serials == (tid2, tid))
+        else:
+            self.assert_(False, "No conflict error")
+
+        self._storage.tpc_abort(t)
+
+
+        # non-stale read, no stress. :)
+        transaction.begin()
+        t = transaction.get()
+        self._storage.tpc_begin(t)
+        self._storage.store('\0\0\0\0\0\0\0\xf2',
+                            '\0\0\0\0\0\0\0\0', 'x', '', t)
+        self._storage.checkCurrentSerialInTransaction(oid, tid2, t)
+        self._storage.tpc_vote(t)
+        self._storage.tpc_finish(t)
+
+        # non-stale read, competition after vote.  The competing
+        # transaction most produce a tid > this transaction's tid
+        transaction.begin()
+        t = transaction.get()
+        self._storage.tpc_begin(t)
+        self._storage.store('\0\0\0\0\0\0\0\xf3',
+                            '\0\0\0\0\0\0\0\0', 'x', '', t)
+        self._storage.checkCurrentSerialInTransaction(oid, tid2, t)
+        self._storage.tpc_vote(t)
+
+        # We'll run the competing trans in a separate thread:
+        import threading, time
+        thread = threading.Thread(name='T1',
+            target=self._dostore, args=(oid,), kwargs=dict(revid=tid2))
+        thread.start()
+        time.sleep(.1)
+        self._storage.tpc_finish(t)
+        thread.join()
+
+        tid3 = self._storage.load(oid)[1]
+        self.assert_(tid3 > self._storage.load('\0\0\0\0\0\0\0\xf3')[1])
+
+        # non-stale competing trans after checkCurrentSerialInTransaction
+        transaction.begin()
+        t = transaction.get()
+        self._storage.tpc_begin(t)
+        self._storage.store('\0\0\0\0\0\0\0\xf4',
+                            '\0\0\0\0\0\0\0\0', 'x', '', t)
+        self._storage.checkCurrentSerialInTransaction(oid, tid3, t)
+
+        # We'll run the competing trans in a separate thread:
+        thread = threading.Thread(name='T2',
+            target=self._dostore, args=(oid,), kwargs=dict(revid=tid3))
+        thread.start()
+        time.sleep(.1)
+
+        # There are 2 possibilities:
+        # 1. The store happens before this transaction completes,
+        #    in which case, the vote below fails.
+        # 2. The store happens after this trans, in which case, the
+        #    tid of the object is greater than this transaction's tid.
+        try:
+            self._storage.tpc_vote(t)
+        except ReadConflictError:
+            thread.join() # OK :)
+        else:
+            self._storage.tpc_finish(t)
+            thread.join()
+            tid4 = self._storage.load(oid)[1]
+            self.assert_(tid4 > self._storage.load('\0\0\0\0\0\0\0\xf4')[1])
+

Modified: ZODB/trunk/src/ZODB/tests/test_storage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/test_storage.py	2010-09-02 13:55:24 UTC (rev 116131)
+++ ZODB/trunk/src/ZODB/tests/test_storage.py	2010-09-02 13:55:26 UTC (rev 116132)
@@ -73,7 +73,8 @@
     def _clear_temp(self):
         pass
 
-    def load(self, oid, version):
+    def load(self, oid, version=''):
+        assert version == ''
         self._lock_acquire()
         try:
             assert not version



More information about the checkins mailing list