[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/StorageServer.py Rearranged the code a bit, especially rolled up some excess

Jim Fulton jim at zope.com
Tue Dec 23 14:10:47 EST 2008


Log message for revision 94295:
  Rearranged the code a bit, especially rolled up some excess
  abstraction to make the code a little more readable while trying to
  decipher the storage locking logic.
  

Changed:
  U   ZODB/trunk/src/ZEO/StorageServer.py

-=-
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2008-12-23 18:58:25 UTC (rev 94294)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2008-12-23 19:10:46 UTC (rev 94295)
@@ -123,7 +123,7 @@
         self.database = database
 
     def notifyConnected(self, conn):
-        self.connection = conn # For restart_other() below
+        self.connection = conn
         assert conn.peer_protocol_version is not None
         if conn.peer_protocol_version < 'Z309':
             self.client = ClientStub308(conn)
@@ -143,9 +143,23 @@
         # any pending transaction.
         if self.transaction is not None:
             self.log("disconnected during transaction %s" % self.transaction)
-            self._abort()
+            if not self.locked:
+                # Delete (d, zeo_storage) from the _waiting list, if found.
+                waiting = self.storage._waiting
+                for i in range(len(waiting)):
+                    d, z = waiting[i]
+                    if z is self:
+                        del waiting[i]
+                        self.log("Closed connection removed from waiting list."
+                                 " Clients waiting: %d." % len(waiting))
+                        break
+
+            if self.transaction:
+                self.tpc_abort(self.transaction.id)
+
         else:
             self.log("disconnected")
+
         if self.stats is not None:
             self.stats.clients -= 1
 
@@ -412,7 +426,6 @@
         if not self._check_tid(id):
             return
         assert self.locked
-        self.stats.active_txns -= 1
         self.stats.commits += 1
         self.storage.tpc_finish(self.transaction)
         tid = self.storage.lastTransaction()
@@ -426,7 +439,6 @@
     def tpc_abort(self, id):
         if not self._check_tid(id):
             return
-        self.stats.active_txns -= 1
         self.stats.aborts += 1
         if self.locked:
             self.storage.tpc_abort(self.transaction)
@@ -434,6 +446,7 @@
 
     def _clear_transaction(self):
         # Common code at end of tpc_finish() and tpc_abort()
+        self.stats.active_txns -= 1
         self.transaction = None
         self.txnlog.close()
         if self.locked:
@@ -441,28 +454,116 @@
             self.timeout.end(self)
             self.stats.lock_time = None
             self.log("Transaction released storage lock", BLATHER)
-            # _handle_waiting() can start another transaction (by
-            # restarting a waiting one) so must be done last
-            self._handle_waiting()
 
-    def _abort(self):
-        # called when a connection is closed unexpectedly
-        if not self.locked:
-            # Delete (d, zeo_storage) from the _waiting list, if found.
-            waiting = self.storage._waiting
-            for i in range(len(waiting)):
-                d, z = waiting[i]
-                if z is self:
-                    del waiting[i]
-                    self.log("Closed connection removed from waiting list."
-                             " Clients waiting: %d." % len(waiting))
-                    break
+            # Restart any client waiting for the storage lock.
+            while self.storage._waiting:
+                delay, zeo_storage = self.storage._waiting.pop(0)
+                try:
+                    zeo_storage._restart(delay)
+                except:
+                    self.log("Unexpected error handling waiting transaction",
+                             level=logging.WARNING, exc_info=True)
+                    zeo_storage.connection.close()
+                    continue
 
-        if self.transaction:
-            self.stats.active_txns -= 1
-            self.stats.aborts += 1
-            self.tpc_abort(self.transaction.id)
+                if self.storage._waiting:
+                    n = len(self.storage._waiting)
+                    self.log("Blocked transaction restarted.  "
+                             "Clients waiting: %d" % n)
+                else:
+                    self.log("Blocked transaction restarted.")
 
+                break
+
+    # The following two methods return values, so they must acquire
+    # the storage lock and begin the transaction before returning.
+
+    # It's a bit vile that undo can cause us to get the lock before vote.
+
+    def undo(self, trans_id, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        if self.locked:
+            return self._undo(trans_id)
+        else:
+            return self._wait(lambda: self._undo(trans_id))
+
+    def vote(self, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        if self.locked:
+            return self._vote()
+        else:
+            return self._wait(lambda: self._vote())
+
+    # When a delayed transaction is restarted, the dance is
+    # complicated.  The restart occurs when one ZEOStorage instance
+    # finishes as a transaction and finds another instance is in the
+    # _waiting list.
+
+    # It might be better to have a mechanism to explicitly send
+    # the finishing transaction's reply before restarting the waiting
+    # transaction.  If the restart takes a long time, the previous
+    # client will be blocked until it finishes.
+
+    def _wait(self, thunk):
+        # Wait for the storage lock to be acquired.
+        self._thunk = thunk
+        if self.tpc_transaction():
+            d = Delay()
+            self.storage._waiting.append((d, self))
+            self.log("Transaction blocked waiting for storage. "
+                     "Clients waiting: %d." % len(self.storage._waiting))
+            return d
+        else:
+            self.log("Transaction acquired storage lock.", BLATHER)
+            return self._restart()
+
+    def _restart(self, delay=None):
+        # Restart when the storage lock is available.
+        if self.txnlog.stores == 1:
+            template = "Preparing to commit transaction: %d object, %d bytes"
+        else:
+            template = "Preparing to commit transaction: %d objects, %d bytes"
+        self.log(template % (self.txnlog.stores, self.txnlog.size()),
+                 level=BLATHER)
+
+        self.locked = 1
+        self.timeout.begin(self)
+        self.stats.lock_time = time.time()
+        if (self.tid is not None) or (self.status != ' '):
+            self.storage.tpc_begin(self.transaction, self.tid, self.status)
+        else:
+            self.storage.tpc_begin(self.transaction)
+
+        loads, loader = self.txnlog.get_loader()
+        for i in range(loads):
+            store = loader.load()
+            store_type = store[0]
+            store_args = store[1:]
+
+            if store_type == 'd':
+                do_store = self._delete
+            elif store_type == 's':
+                do_store = self._store
+            elif store_type == 'r':
+                do_store = self._restore
+            else:
+                raise ValueError('Invalid store type: %r' % store_type)
+
+            if not do_store(*store_args):
+                break
+
+        # Blob support
+        while self.blob_log:
+            oid, oldserial, data, blobfilename = self.blob_log.pop()
+            self.storage.storeBlob(oid, oldserial, data, blobfilename,
+                                   '', self.transaction,)
+
+        resp = self._thunk()
+        if delay is not None:
+            delay.reply(resp)
+        else:
+            return resp
+
     # The public methods of the ZEO client API do not do the real work.
     # They defer work until after the storage lock has been acquired.
     # Most of the real implementations are in methods beginning with
@@ -487,7 +588,7 @@
         assert self.blob_tempfile is None
         self.blob_tempfile = tempfile.mkstemp(
             dir=self.storage.temporaryDirectory())
-        
+
     def storeBlobChunk(self, chunk):
         os.write(self.blob_tempfile[0], chunk)
 
@@ -506,23 +607,6 @@
     def sendBlob(self, oid, serial):
         self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
 
-    # The following four methods return values, so they must acquire
-    # the storage lock and begin the transaction before returning.
-
-    def vote(self, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        if self.locked:
-            return self._vote()
-        else:
-            return self._wait(lambda: self._vote())
-
-    def undo(self, trans_id, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        if self.locked:
-            return self._undo(trans_id)
-        else:
-            return self._wait(lambda: self._undo(trans_id))
-
     def _delete(self, oid, serial):
         err = None
         try:
@@ -635,102 +719,6 @@
         self.invalidated.extend(oids)
         return tid, oids
 
-    # When a delayed transaction is restarted, the dance is
-    # complicated.  The restart occurs when one ZEOStorage instance
-    # finishes as a transaction and finds another instance is in the
-    # _waiting list.
-
-    # It might be better to have a mechanism to explicitly send
-    # the finishing transaction's reply before restarting the waiting
-    # transaction.  If the restart takes a long time, the previous
-    # client will be blocked until it finishes.
-
-    def _wait(self, thunk):
-        # Wait for the storage lock to be acquired.
-        self._thunk = thunk
-        if self.tpc_transaction():
-            d = Delay()
-            self.storage._waiting.append((d, self))
-            self.log("Transaction blocked waiting for storage. "
-                     "Clients waiting: %d." % len(self.storage._waiting))
-            return d
-        else:
-            self.log("Transaction acquired storage lock.", BLATHER)
-            return self._restart()
-
-    def _restart(self, delay=None):
-        # Restart when the storage lock is available.
-        if self.txnlog.stores == 1:
-            template = "Preparing to commit transaction: %d object, %d bytes"
-        else:
-            template = "Preparing to commit transaction: %d objects, %d bytes"
-        self.log(template % (self.txnlog.stores, self.txnlog.size()),
-                 level=BLATHER)
-
-        self.locked = 1
-        self.timeout.begin(self)
-        self.stats.lock_time = time.time()
-        if (self.tid is not None) or (self.status != ' '):
-            self.storage.tpc_begin(self.transaction, self.tid, self.status)
-        else:
-            self.storage.tpc_begin(self.transaction)
-
-        loads, loader = self.txnlog.get_loader()
-        for i in range(loads):
-            store = loader.load()
-            store_type = store[0]
-            store_args = store[1:]
-
-            if store_type == 'd':
-                do_store = self._delete
-            elif store_type == 's':
-                do_store = self._store
-            elif store_type == 'r':
-                do_store = self._restore
-            else:
-                raise ValueError('Invalid store type: %r' % store_type)
-
-            if not do_store(*store_args):
-                break
-
-        # Blob support
-        while self.blob_log:
-            oid, oldserial, data, blobfilename = self.blob_log.pop()
-            self.storage.storeBlob(oid, oldserial, data, blobfilename, 
-                                   '', self.transaction,)
-
-        resp = self._thunk()
-        if delay is not None:
-            delay.reply(resp)
-        else:
-            return resp
-
-    def _handle_waiting(self):
-        # Restart any client waiting for the storage lock.
-        while self.storage._waiting:
-            delay, zeo_storage = self.storage._waiting.pop(0)
-            if self._restart_other(zeo_storage, delay):
-                if self.storage._waiting:
-                    n = len(self.storage._waiting)
-                    self.log("Blocked transaction restarted.  "
-                             "Clients waiting: %d" % n)
-                else:
-                    self.log("Blocked transaction restarted.")
-                return
-
-    def _restart_other(self, zeo_storage, delay):
-        # Return True if the server restarted.
-        # call the restart() method on the appropriate server.
-        try:
-            zeo_storage._restart(delay)
-        except:
-            self.log("Unexpected error handling waiting transaction",
-                     level=logging.WARNING, exc_info=True)
-            zeo_storage.connection.close()
-            return 0
-        else:
-            return 1
-
     # IStorageIteration support
 
     def iterator_start(self, start, stop):
@@ -809,8 +797,8 @@
 
     def invalidateCache(self):
         self.server._invalidateCache(self.storage_id)
-        
 
+
 class StorageServer:
 
     """The server side implementation of ZEO.
@@ -1064,8 +1052,8 @@
                 p.connection.trigger.pull_trigger()
             except ZEO.zrpc.error.DisconnectedError:
                 pass
-        
 
+
     def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
         """Internal: broadcast info and invalidations to clients.
 
@@ -1110,8 +1098,8 @@
         #
         # b. A connection is closes while we are iterating. We'll need
         #    to cactch and ignore Disconnected errors.
-        
 
+
         if invalidated:
             invq = self.invq[storage_id]
             if len(invq) >= self.invq_bound:
@@ -1138,11 +1126,11 @@
         do full cache verification.
         """
 
-        
+
         invq = self.invq[storage_id]
 
         # We make a copy of invq because it might be modified by a
-        # foreign (other than main thread) calling invalidate above.        
+        # foreign (other than main thread) calling invalidate above.
         invq = invq[:]
 
         if not invq:
@@ -1421,4 +1409,4 @@
     def __getattr__(self, name):
         return getattr(self.storage, name)
 
-    
+



More information about the Zodb-checkins mailing list