[Checkins] SVN: relstorage/trunk/ Made compatible with ZODB 3.10.0b7.

Shane Hathaway shane at hathawaymix.org
Thu Sep 30 08:01:01 EDT 2010


Log message for revision 117084:
  Made compatible with ZODB 3.10.0b7.
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/adapters/interfaces.py
  U   relstorage/trunk/relstorage/adapters/mover.py
  U   relstorage/trunk/relstorage/storage.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2010-09-30 11:04:05 UTC (rev 117083)
+++ relstorage/trunk/CHANGES.txt	2010-09-30 12:01:00 UTC (rev 117084)
@@ -2,6 +2,8 @@
 1.4.0 (2010-09-30)
 ------------------
 
+- Made compatible with ZODB 3.10.0b7.
+
 - Enabled ketama and compression in pylibmc_wrapper.  Both options
   are better for clusters.
 

Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py	2010-09-30 11:04:05 UTC (rev 117083)
+++ relstorage/trunk/relstorage/adapters/interfaces.py	2010-09-30 12:01:00 UTC (rev 117084)
@@ -209,6 +209,9 @@
         Returns None if no later state exists.
         """
 
+    def current_object_tids(cursor, oids):
+        """Returns the current {oid: tid} for specified object ids."""
+
     def on_store_opened(cursor, restart=False):
         """Create the temporary table for storing objects.
 

Modified: relstorage/trunk/relstorage/adapters/mover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py	2010-09-30 11:04:05 UTC (rev 117083)
+++ relstorage/trunk/relstorage/adapters/mover.py	2010-09-30 12:01:00 UTC (rev 117084)
@@ -45,6 +45,7 @@
         'exists',
         'load_before',
         'get_object_tid_after',
+        'current_object_tids',
         'on_store_opened',
         'make_batcher',
         'store_temp',
@@ -346,6 +347,31 @@
 
 
 
+    def generic_current_object_tids(self, cursor, oids):
+        """Returns the current {oid: tid} for specified object ids."""
+        res = {}
+        if self.keep_history:
+            table = 'current_object'
+        else:
+            table = 'object_state'
+        oids = list(oids)
+        while oids:
+            oid_list = ','.join(str(oid) for oid in oids[:1000])
+            del oids[:1000]
+            stmt = "SELECT zoid, tid FROM %s WHERE zoid IN (%s)" % (
+                table, oid_list)
+            cursor.execute(stmt)
+            for oid, tid in cursor:
+                res[oid] = tid
+        return res
+
+    postgresql_current_object_tids = generic_current_object_tids
+    mysql_current_object_tids = generic_current_object_tids
+    oracle_current_object_tids = generic_current_object_tids
+
+
+
+
     def postgresql_on_store_opened(self, cursor, restart=False):
         """Create the temporary table for storing objects"""
         # note that the md5 column is not used if self.keep_history == False.

Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py	2010-09-30 11:04:05 UTC (rev 117083)
+++ relstorage/trunk/relstorage/storage.py	2010-09-30 12:01:00 UTC (rev 117084)
@@ -70,7 +70,9 @@
 # early rather than wait for an explicit abort.
 abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
 
+z64 = '\0' * 8
 
+
 class RelStorage(
         UndoLogCompatible,
         ConflictResolution.ConflictResolvingStorage
@@ -97,7 +99,7 @@
     _tid = None
 
     # _ltid is the ID of the last transaction committed by this instance.
-    _ltid = None
+    _ltid = z64
 
     # _prepared_txn is the name of the transaction to commit in the
     # second phase.
@@ -132,6 +134,10 @@
     # currently uncommitted transaction.
     _txn_blobs = None
 
+    # _txn_check_serials: {oid, serial}; confirms that certain objects
+    # have not changed at commit.
+    _txn_check_serials = None
+
     # _batcher: An object that accumulates store operations
     # so they can be executed in batch (to minimize latency).
     _batcher = None
@@ -430,7 +436,7 @@
 
         logfunc('; '.join(msg))
 
-    def load(self, oid, version):
+    def load(self, oid, version=''):
         oid_int = u64(oid)
         cache = self._cache
 
@@ -461,7 +467,7 @@
 
     getSerial = getTid  # ZODB 3.7
 
-    def loadEx(self, oid, version):
+    def loadEx(self, oid, version=''):
         # Since we don't support versions, just tack the empty version
         # string onto load's result.
         return self.load(oid, version) + ("",)
@@ -593,6 +599,27 @@
             self._lock_release()
 
 
+    def checkCurrentSerialInTransaction(self, oid, serial, transaction):
+        if transaction is not self._transaction:
+            raise POSException.StorageTransactionError(self, transaction)
+
+        _, committed_tid = self.load(oid, '')
+        if committed_tid != serial:
+            raise POSException.ReadConflictError(
+                oid=oid, serials=(committed_tid, serial))
+
+        if self._txn_check_serials is None:
+            self._txn_check_serials = {}
+        else:
+            # If this transaction already specified a different serial for
+            # this oid, the transaction conflicts with itself.
+            previous_serial = self._txn_check_serials.get(oid, serial)
+            if previous_serial != serial:
+                raise POSException.ReadConflictError(
+                    oid=oid, serials=(previous_serial, serial))
+        self._txn_check_serials[oid] = serial
+
+
     def tpc_begin(self, transaction, tid=None, status=' '):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
@@ -688,6 +715,7 @@
         self._max_stored_oid = 0
         self._batcher = None
         self._txn_blobs = None
+        self._txn_check_serials = None
         self._cache.clear_temp()
 
 
@@ -789,6 +817,16 @@
         self._prepare_tid()
         tid_int = u64(self._tid)
 
+        if self._txn_check_serials:
+            oid_ints = [u64(oid) for oid in self._txn_check_serials.iterkeys()]
+            current = self._adapter.mover.current_object_tids(cursor, oid_ints)
+            for oid, expect in self._txn_check_serials.iteritems():
+                oid_int = u64(oid)
+                actual = p64(current.get(oid_int, 0))
+                if actual != expect:
+                    raise POSException.ReadConflictError(
+                        oid=oid, serials=(actual, expect))
+
         serials = self._finish_store()
         self._adapter.mover.update_current(cursor, tid_int)
         self._prepared_txn = self._adapter.txncontrol.commit_phase1(
@@ -892,9 +930,12 @@
                     if not os.listdir(dirname):
                         ZODB.blob.remove_committed_dir(dirname)
 
-
     def lastTransaction(self):
-        return self._ltid
+        self._lock_acquire()
+        try:
+            return self._ltid
+        finally:
+            self._lock_release()
 
     def new_oid(self):
         if self._is_read_only:



More information about the checkins mailing list