[Checkins] SVN: ZODB/branches/jim-thready-zeo2/src/ZEO/ Fixed a threading bug introduced when switching to having

Jim Fulton jim at zope.com
Mon Sep 21 16:58:30 EDT 2009


Log message for revision 104411:
  Fixed a threading bug introduced when switching to having
  per-connection threads.
  
  Simplified locking logic somewhat -- I think. :)
  
  It's still more complicated than I'd like to be, in part because of
  undo, which isn't a one-way call but should be.
  

Changed:
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py

-=-
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py	2009-09-21 19:47:54 UTC (rev 104410)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py	2009-09-21 20:58:29 UTC (rev 104411)
@@ -32,6 +32,7 @@
 
 import transaction
 
+import ZODB.blob
 import ZODB.serialize
 import ZODB.TimeStamp
 import ZEO.zrpc.error
@@ -95,14 +96,14 @@
         self.storage_id = "uninitialized"
         self.transaction = None
         self.read_only = read_only
-        self.locked = 0
+        self.locked = False             # Don't have storage lock
+        self.locked_lock = threading.Lock() # mediate locked access
         self.verifying = 0
         self.store_failed = 0
         self.log_label = _label
         self.authenticated = 0
         self.auth_realm = auth_realm
         self.blob_tempfile = None
-        self.blob_log = []
         # The authentication protocol may define extra methods.
         self._extensions = {}
         for func in self.extensions:
@@ -138,25 +139,37 @@
             label = str(host) + ":" + str(port)
         self.log_label = _label + "/" + label
 
+    def notifyLocked(self):
+        # We don't want to give a lock to a disconnected client and, we
+        # need to avoid a race of giving a lock to a client while it's
+        # disconecting. We check self.connection and set self.locked while
+        # the locked_lock is held, preventing self.connection from being
+        # set to None between the check and setting self.lock.
+        self.locked_lock.acquire()
+        try:
+            if self.connection is None:
+                return False # We've been disconnected. Don't take the lock
+            self.locked = True
+            # What happens if, before processing the trigger we, disconnect,
+            # reconnect, and start a new transaction?
+            # This isn't possible because we never reconnect!
+            self.connection.trigger.pull_trigger(self._restart)
+            return True
+        finally:
+            self.locked_lock.release()
+
     def notifyDisconnected(self):
+        self.locked_lock.acquire()
+        try:
+            self.connection = None
+        finally:
+            self.locked_lock.release()
+
         # When this storage closes, we must ensure that it aborts
         # any pending transaction.
         if self.transaction is not None:
             self.log("disconnected during transaction %s" % self.transaction)
-            if not self.locked:
-                # Delete (d, zeo_storage) from the _waiting list, if found.
-                waiting = self.storage._waiting
-                for i in range(len(waiting)):
-                    d, z = waiting[i]
-                    if z is self:
-                        del waiting[i]
-                        self.log("Closed connection removed from waiting list."
-                                 " Clients waiting: %d." % len(waiting))
-                        break
-
-            if self.transaction:
-                self.tpc_abort(self.transaction.id)
-
+            self.tpc_abort(self.transaction.id)
         else:
             self.log("disconnected")
 
@@ -415,6 +428,7 @@
         self.serials = []
         self.invalidated = []
         self.txnlog = CommitLog()
+        self.blob_log = []
         self.tid = tid
         self.status = status
         self.store_failed = 0
@@ -434,109 +448,99 @@
         if not self._check_tid(id):
             return
         assert self.locked
+
         self.stats.commits += 1
-        self.storage.tpc_finish(self.transaction)
+        self.storage.tpc_finish(self.transaction, self._invalidate)
+        # Note that the tid is still current because we still hold the
+        # commit lock. We'll relinquish it in _clear_transaction.
         tid = self.storage.lastTransaction()
-        if self.invalidated:
-            self.server.invalidate(self, self.storage_id, tid,
-                                   self.invalidated, self.get_size_info())
         self._clear_transaction()
         # Return the tid, for cache invalidation optimization
         return tid
 
-    def tpc_abort(self, id):
-        if not self._check_tid(id):
+    def _invalidate(self, tid):
+        if self.invalidated:
+            self.server.invalidate(self, self.storage_id, tid,
+                                   self.invalidated, self.get_size_info())
+
+    def tpc_abort(self, tid):
+        if not self._check_tid(tid):
             return
         self.stats.aborts += 1
+
+        # Is there a race here?  What if notifyLocked is called after
+        # the check?  Well, we still won't have started committing the actual
+        # storage. That wouldn't happen until _restart is called and that
+        # can't happen while this method is executing, as both are only
+        # run by the client thtread. So no race.
         if self.locked:
             self.storage.tpc_abort(self.transaction)
         self._clear_transaction()
 
     def _clear_transaction(self):
         # Common code at end of tpc_finish() and tpc_abort()
+        self.server.unlock_storage(self)
+        self.locked = 0
+        self.transaction = None
         self.stats.active_txns -= 1
-        self.transaction = None
-        self.txnlog.close()
-        if self.locked:
-            self.locked = 0
-            self.timeout.end(self)
-            self.stats.lock_time = None
-            self.log("Transaction released storage lock", BLATHER)
+        if self.txnlog is not None:
+            self.txnlog.close()
+            self.txnlog = None
+            for oid, oldserial, data, blobfilename in self.blob_log:
+                ZODB.blob.remove_committed(blobfilename)
+            del self.blob_log
 
-            # Restart any client waiting for the storage lock.
-            while self.storage._waiting:
-                delay, zeo_storage = self.storage._waiting.pop(0)
-                try:
-                    zeo_storage._restart(delay)
-                except:
-                    self.log("Unexpected error handling waiting transaction",
-                             level=logging.WARNING, exc_info=True)
-                    zeo_storage.connection.close()
-                    continue
-
-                if self.storage._waiting:
-                    n = len(self.storage._waiting)
-                    self.log("Blocked transaction restarted.  "
-                             "Clients waiting: %d" % n)
-                else:
-                    self.log("Blocked transaction restarted.")
-
-                break
-
     # The following two methods return values, so they must acquire
     # the storage lock and begin the transaction before returning.
 
     # It's a bit vile that undo can cause us to get the lock before vote.
 
-    def undo(self, trans_id, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        if self.locked:
+    def undo(self, trans_id, tid):
+        self._check_tid(tid, exc=StorageTransactionError)
+
+        if self.txnlog is not None:
+            return self._wait(lambda: self._undo(trans_id))
+        else:
             return self._undo(trans_id)
+
+    def vote(self, tid):
+        self._check_tid(tid, exc=StorageTransactionError)
+
+        if self.txnlog is not None:
+            return self._wait(lambda: self._vote())
         else:
-            return self._wait(lambda: self._undo(trans_id))
-
-    def vote(self, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        if self.locked:
             return self._vote()
-        else:
-            return self._wait(lambda: self._vote())
 
-    # When a delayed transaction is restarted, the dance is
-    # complicated.  The restart occurs when one ZEOStorage instance
-    # finishes as a transaction and finds another instance is in the
-    # _waiting list.
 
-    # It might be better to have a mechanism to explicitly send
-    # the finishing transaction's reply before restarting the waiting
-    # transaction.  If the restart takes a long time, the previous
-    # client will be blocked until it finishes.
-
+    _thunk = _delay = None
     def _wait(self, thunk):
         # Wait for the storage lock to be acquired.
+        assert self._thunk == self._delay == None
         self._thunk = thunk
-        if self.tpc_transaction():
-            d = Delay()
-            self.storage._waiting.append((d, self))
-            self.log("Transaction blocked waiting for storage. "
-                     "Clients waiting: %d." % len(self.storage._waiting))
-            return d
-        else:
+        if self.server.lock_storage(self):
+            assert not self.tpc_transaction()
             self.log("Transaction acquired storage lock.", BLATHER)
+            self.locked = True
             return self._restart()
 
-    def _restart(self, delay=None):
+        self._delay = d = Delay()
+        return d
+
+    def _restart(self):
+        if not self.locked:
+            # Must have been disconnected after locking
+            assert self.connection is None
+            return
+
         # Restart when the storage lock is available.
         if self.txnlog.stores == 1:
             template = "Preparing to commit transaction: %d object, %d bytes"
         else:
             template = "Preparing to commit transaction: %d objects, %d bytes"
+
         self.log(template % (self.txnlog.stores, self.txnlog.size()),
                  level=BLATHER)
 
-        self.locked = 1
-        self.timeout.begin(self)
-        self.stats.lock_time = time.time()
         if (self.tid is not None) or (self.status != ' '):
             self.storage.tpc_begin(self.transaction, self.tid, self.status)
         else:
@@ -561,17 +565,23 @@
                 break
 
         # Blob support
-        while self.blob_log:
-            oid, oldserial, data, blobfilename = self.blob_log.pop()
+        for oid, oldserial, data, blobfilename in self.blob_log:
             self.storage.storeBlob(oid, oldserial, data, blobfilename,
                                    '', self.transaction,)
 
-        resp = self._thunk()
+        thunk = self._thunk
+        delay = self._delay
+        self._thunk = self._delay = None
+
+        resp = thunk()
         if delay is not None:
             delay.reply(resp)
-        else:
-            return resp
 
+        self.txnlog.close()
+        self.txnlog = None
+        del self.blob_log
+        return resp
+
     # The public methods of the ZEO client API do not do the real work.
     # They defer work until after the storage lock has been acquired.
     # Most of the real implementations are in methods beginning with
@@ -601,14 +611,18 @@
         os.write(self.blob_tempfile[0], chunk)
 
     def storeBlobEnd(self, oid, serial, data, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        assert self.txnlog is not None # effectively not allowed after undo
         fd, tempname = self.blob_tempfile
         self.blob_tempfile = None
         os.close(fd)
         self.blob_log.append((oid, serial, data, tempname))
 
     def storeBlobShared(self, oid, serial, data, filename, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        assert self.txnlog is not None # effectively not allowed after undo
+
         # Reconstruct the full path from the filename in the OID directory
-
         if (os.path.sep in filename
             or not (filename.endswith('.tmp')
                     or filename[:-1].endswith('.tmp')
@@ -722,6 +736,7 @@
         return error
 
     def _vote(self):
+        assert self.locked
         if not self.store_failed:
             # Only call tpc_vote of no store call failed, otherwise
             # the serialnos() call will deliver an exception that will be
@@ -916,8 +931,10 @@
              for name, storage in storages.items()])
         log("%s created %s with storages: %s" %
             (self.__class__.__name__, read_only and "RO" or "RW", msg))
-        for s in storages.values():
-            s._waiting = []
+
+        self.lockers = dict((name, []) for name in storages)
+        self.lockers_lock = threading.Lock()
+
         self.read_only = read_only
         self.auth_protocol = auth_protocol
         self.auth_database = auth_database
@@ -1203,7 +1220,51 @@
             if conn.obj in cl:
                 cl.remove(conn.obj)
 
+    def lock_storage(self, zeostore):
+        self.lockers_lock.acquire()
+        try:
+            storage_id = zeostore.storage_id
+            lockers = self.lockers[storage_id]
+            lockers.append(zeostore)
+            if len(lockers) == 1:
+                self.timeouts[storage_id].begin(zeostore)
+                self.stats[storage_id].lock_time = time.time()
+                return True
+            else:
+                zeostore.log("(%r) queue lock: transactions waiting: %s"
+                             % (storage_id, len(lockers)-1))
+        finally:
+            self.lockers_lock.release()
 
+    def unlock_storage(self, zeostore):
+        self.lockers_lock.acquire()
+        try:
+            storage_id = zeostore.storage_id
+            lockers = self.lockers[storage_id]
+            if zeostore in lockers:
+                if lockers[0] == zeostore:
+                    self.timeouts[storage_id].end(zeostore)
+                    self.stats[storage_id].lock_time = None
+                    lockers.pop(0)
+                    while lockers:
+                        zeostore.log("(%r) unlock: transactions waiting: %s"
+                                     % (storage_id, len(lockers)-1))
+                        zeostore = lockers[0]
+                        if zeostore.notifyLocked():
+                            self.timeouts[storage_id].begin(zeostore)
+                            self.stats[storage_id].lock_time = time.time()
+                            break
+                        else:
+                            # The queued client was closed, so dequeue it
+                            lockers.pop(0)
+                else:
+                    lockers.remove(zeostore)
+                    if lockers:
+                        zeostore.log("(%r) dequeue: transactions waiting: %s"
+                                     % (storage_id, len(lockers)-1))
+        finally:
+            self.lockers_lock.release()
+
 class StubTimeoutThread:
 
     def begin(self, client):
@@ -1225,7 +1286,6 @@
         self._client = None
         self._deadline = None
         self._cond = threading.Condition() # Protects _client and _deadline
-        self._trigger = trigger()
 
     def begin(self, client):
         # Called from the restart code the "main" thread, whenever the
@@ -1268,7 +1328,8 @@
             if howlong <= 0:
                 client.log("Transaction timeout after %s seconds" %
                            self._timeout)
-                self._trigger.pull_trigger(lambda: client.connection.close())
+                client.connection.trigger.pull_trigger(
+                    lambda: client.connection.close())
             else:
                 time.sleep(howlong)
 

Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py	2009-09-21 19:47:54 UTC (rev 104410)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py	2009-09-21 20:58:29 UTC (rev 104411)
@@ -734,7 +734,6 @@
         self.storage_id = storage_id
         self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
         self.server.register(storage_id, False)
-        self.server._thunk = lambda : None
         self.server.client = StorageServerClientWrapper()
 
     def sortKey(self):
@@ -756,7 +755,6 @@
         self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
 
     def tpc_vote(self, transaction):
-        self.server._restart()
         self.server.vote(id(transaction))
         result = self.server.client.serials[:]
         del self.server.client.serials[:]



More information about the checkins mailing list