[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/ Simplified vote handling and made it more robust to storage vote

Jim Fulton jim at zope.com
Thu Apr 29 17:27:56 EDT 2010


Log message for revision 111593:
  Simplified vote handling and made it more robust to storage vote
  failures.
  
  Also made the disconnected in transaction messages show the locked
  status.
  

Changed:
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/tests/testZEO2.py

-=-
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2010-04-29 19:53:17 UTC (rev 111592)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2010-04-29 21:27:55 UTC (rev 111593)
@@ -145,7 +145,8 @@
         # When this storage closes, we must ensure that it aborts
         # any pending transaction.
         if self.transaction is not None:
-            self.log("disconnected during transaction %s" % self.transaction)
+            self.log("disconnected during %s transaction"
+                     % self.locked and 'locked' or 'unlocked')
             self.tpc_abort(self.transaction.id)
         else:
             self.log("disconnected")
@@ -442,8 +443,7 @@
         if not self._check_tid(tid):
             return
         self.stats.aborts += 1
-        if self.locked:
-            self.storage.tpc_abort(self.transaction)
+        self.storage.tpc_abort(self.transaction)
         self._clear_transaction()
 
     def _clear_transaction(self):
@@ -470,8 +470,40 @@
         self.locked = self.server.lock_storage(self)
         if self.locked:
             try:
-                self._vote()
+                self.log(
+                    "Preparing to commit transaction: %d objects, %d bytes"
+                    % (self.txnlog.stores, self.txnlog.size()),
+                    level=BLATHER)
+
+                if (self.tid is not None) or (self.status != ' '):
+                    self.storage.tpc_begin(self.transaction,
+                                           self.tid, self.status)
+                else:
+                    self.storage.tpc_begin(self.transaction)
+
+                for op, args in self.txnlog:
+                    if not getattr(self, op)(*args):
+                        break
+
+                # Blob support
+                while self.blob_log and not self.store_failed:
+                    oid, oldserial, data, blobfilename = self.blob_log.pop()
+                    self._store(oid, oldserial, data, blobfilename)
+
+                if not self.store_failed:
+                    # Only call tpc_vote of no store call failed,
+                    # otherwise the serialnos() call will deliver an
+                    # exception that will be handled by the client in
+                    # its tpc_vote() method.
+                    serials = self.storage.tpc_vote(self.transaction)
+                    if serials:
+                        self.serials.extend(serials)
+
+                self.client.serialnos(self.serials)
+
             except Exception:
+                self.storage.tpc_abort(self.transaction)
+                self._clear_transaction()
                 if delay is not None:
                     delay.error()
                 else:
@@ -492,47 +524,6 @@
         if connection is not None:
             connection.call_from_thread(self._try_to_vote, delay)
 
-    def _vote(self):
-
-        if self.txnlog.stores == 1:
-            template = "Preparing to commit transaction: %d object, %d bytes"
-        else:
-            template = "Preparing to commit transaction: %d objects, %d bytes"
-
-        self.log(template % (self.txnlog.stores, self.txnlog.size()),
-                 level=BLATHER)
-
-        if (self.tid is not None) or (self.status != ' '):
-            self.storage.tpc_begin(self.transaction, self.tid, self.status)
-        else:
-            self.storage.tpc_begin(self.transaction)
-
-        try:
-            for op, args in self.txnlog:
-                if not getattr(self, op)(*args):
-                    break
-
-            # Blob support
-            while self.blob_log and not self.store_failed:
-                oid, oldserial, data, blobfilename = self.blob_log.pop()
-                self._store(oid, oldserial, data, blobfilename)
-
-        except:
-            self.storage.tpc_abort(self.transaction)
-            self._clear_transaction()
-            raise
-
-
-        if not self.store_failed:
-            # Only call tpc_vote of no store call failed, otherwise
-            # the serialnos() call will deliver an exception that will be
-            # handled by the client in its tpc_vote() method.
-            serials = self.storage.tpc_vote(self.transaction)
-            if serials:
-                self.serials.extend(serials)
-
-        self.client.serialnos(self.serials)
-
     # The public methods of the ZEO client API do not do the real work.
     # They defer work until after the storage lock has been acquired.
     # Most of the real implementations are in methods beginning with

Modified: ZODB/trunk/src/ZEO/tests/testZEO2.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO2.py	2010-04-29 19:53:17 UTC (rev 111592)
+++ ZODB/trunk/src/ZEO/tests/testZEO2.py	2010-04-29 21:27:55 UTC (rev 111593)
@@ -155,7 +155,51 @@
     >>> fs.close()
     """
 
+def errors_in_vote_should_clear_lock():
+    """
 
+So, we arrange to get an error in vote:
+
+    >>> import ZODB.MappingStorage
+    >>> vote_should_fail = True
+    >>> class MappingStorage(ZODB.MappingStorage.MappingStorage):
+    ...     def tpc_vote(*args):
+    ...         if vote_should_fail:
+    ...             raise ValueError
+    ...         return ZODB.MappingStorage.MappingStorage.tpc_vote(*args)
+
+    >>> server = ZEO.tests.servertesting.StorageServer(
+    ...      'x', {'1': MappingStorage()})
+    >>> zs = ZEO.StorageServer.ZEOStorage(server)
+    >>> conn = ZEO.tests.servertesting.Connection(1)
+    >>> zs.notifyConnected(conn)
+    >>> zs.register('1', 0)
+    >>> zs.tpc_begin('0', '', '', {})
+    >>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
+    >>> zs.vote('0')
+    Traceback (most recent call last):
+    ...
+    ValueError
+
+When we do, the storage server's transaction lock shouldn't be held:
+
+    >>> '1' in server._commit_locks
+    False
+
+Of course, of vote suceeds, the lock will be held:
+
+    >>> vote_should_fail = False
+    >>> zs.tpc_begin('1', '', '', {})
+    >>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1')
+    >>> _ = zs.vote('1') # doctest: +ELLIPSIS
+    1 callAsync serialnos ...
+
+    >>> '1' in server._commit_locks
+    True
+
+    """
+
+
 def test_suite():
     return unittest.TestSuite((
         doctest.DocTestSuite(



More information about the Zodb-checkins mailing list