[Zodb-checkins] CVS: ZODB3/ZODB - Transaction.py:1.39.16.2

Jeremy Hylton jeremy@zope.com
Fri, 1 Nov 2002 14:34:38 -0500


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv22082/ZODB

Modified Files:
      Tag: ZODB3-deadlock-debug-branch
	Transaction.py 
Log Message:
Major rewrite to guarantee jars are locked in a globally consistent order.

Too many changes to do justice to in a checkin message.  Well write a
better comment when this gets committed to the trunk.


=== ZODB3/ZODB/Transaction.py 1.39.16.1 => 1.39.16.2 ===
--- ZODB3/ZODB/Transaction.py:1.39.16.1	Thu Oct 31 16:33:09 2002
+++ ZODB3/ZODB/Transaction.py	Fri Nov  1 14:34:37 2002
@@ -19,23 +19,17 @@
 import time, sys, struct, POSException
 from struct import pack
 from string import split, strip, join
-from zLOG import LOG, ERROR, PANIC
+from zLOG import LOG, ERROR, PANIC, INFO, BLATHER
 from POSException import ConflictError
+from ZODB import utils
 
 # Flag indicating whether certain errors have occurred.
 hosed=0
 
-def get_jars_in_order(jars):
-    # make sure jarsv contains the jars in the order they
-    # were added
-    L = []
-    for jar, order in jars.values():
-        L.append((order, jar))
-    L.sort()
-    jarsv = []
-    for order, jar in L:
-        jarsv.append(jar)
-    return jarsv
+def jar_cmp(j1, j2):
+    # Call sortKey() every time, because a ZEO client could reconnect
+    # to a different server at any time.
+    return cmp(j1.sortKey(), j2.sortKey())
 
 class Transaction:
     'Simple transaction objects for single-threaded applications.'
@@ -65,6 +59,9 @@
             for c in self._connections.values(): c.close()
             del self._connections
 
+    def log(self, msg, level=INFO, error=None):
+        LOG("TM:%s" % self._id, level, msg, error=error)
+
     def sub(self):
         # Create a manually managed subtransaction for internal use
         r=self.__class__()
@@ -89,6 +86,8 @@
         entered two-phase commit yet, so no tpc_ messages are sent.
         '''
 
+        self.log("abort")
+
         if subtransaction and (self._non_st_objects is not None):
             raise POSException.TransactionError, (
                 """Attempted to abort a sub-transaction, but a participating
@@ -96,11 +95,8 @@
                 """)
 
         t = None
-        subj = self._sub
-        subjars = ()
 
         if not subtransaction:
-
             # Must add in any non-subtransaction supporting objects that
             # may have been stowed away from previous subtransaction
             # commits.
@@ -108,11 +104,14 @@
                 self._objects.extend(self._non_st_objects)
                 self._non_st_objects = None
 
-            if subj is not None:
+            if self._sub is not None:
                 # Abort of top-level transaction after commiting
                 # subtransactions.
-                subjars = subj.values()
+                subjars = self._sub.values()
+                subjars.sort(jar_cmp)
                 self._sub = None
+            else:
+                subjars = []
 
         try:
             # Abort the objects
@@ -122,13 +121,20 @@
                     if j is not None:
                         j.abort(o, self)
                 except:
+                    # Record the first exception that occurred
                     if t is None:
                         t, v, tb = sys.exc_info()
+                    else:
+                        self.log("Failed to abort object %016x" %
+                                 utils.U64(o._p_oid), error=sys.exc_info())
 
-            # Ugh, we need to abort work done in sub-transactions.
-            while subjars:
-                j = subjars.pop()
-                j.abort_sub(self) # This should never fail
+            # tpc_begin() was never called, so tpc_abort() should not be
+            # called.
+
+            if not subtransaction:
+                # abort_sub() must be called to clear subtransaction state
+                for jar in subjars:
+                    jar.abort_sub(self) # This should never fail
 
             if t is not None:
                 raise t, v, tb
@@ -148,7 +154,9 @@
 
         This aborts any transaction in progres.
         '''
-        if self._objects: self.abort(subtransaction, 0)
+        if self._objects:
+            self.abort(subtransaction, 0)
+        self.log("begin")
         if info:
             info=split(info,'\t')
             self.user=strip(info[0])
@@ -157,31 +165,36 @@
     def commit(self, subtransaction=None):
         'Finalize the transaction'
 
+        self.log("commit")
+
         objects = self._objects
-        jars = {}
-        jarsv = None
-        subj = self._sub
-        subjars = ()
 
         if subtransaction:
-            if subj is None:
-                self._sub = subj = {}
+            if self._sub is None:
+                # Must store state across multiple subtransactions
+                # so that the final commit can commit all subjars.
+                self._sub = {}
         else:
-            if subj is not None:
+            if self._sub is not None:
+                # This commit is for a top-level transaction that
+                # has previously committed subtransactions.  Do
+                # one last subtransaction commit to clear out the
+                # current objects, then commit all the subjars.
                 if objects:
-                    # Do an implicit sub-transaction commit:
                     self.commit(1)
-                    # XXX What does this do?
                     objects = []
-                subjars = subj.values()
+                subjars = self._sub.values()
+                subjars.sort(jar_cmp)
                 self._sub = None
 
-        # If not a subtransaction, then we need to add any non-
-        # subtransaction-supporting objects that may have been
-        # stowed away during subtransaction commits to _objects.
-        if (subtransaction is None) and (self._non_st_objects is not None):
-            objects.extend(self._non_st_objects)
-            self._non_st_objects = None
+                # If there were any non-subtransaction-aware jars
+                # involved in earlier subtransaction commits, we need
+                # to add them to the list of jars to commit.
+                if self._non_st_objects is not None:
+                    objects.extend(self._non_st_objects)
+                    self._non_st_objects = None
+            else:
+                subjars = []
 
         if (objects or subjars) and hosed:
             # Something really bad happened and we don't
@@ -200,90 +213,118 @@
         #   either call tpc_abort or tpc_finish. It is OK to call
         #   these multiple times, as the storage is required to ignore
         #   these calls if tpc_begin has not been called.
+        #
+        # - That we call tpc_begin() in a globally consistent order,
+        #   so that concurrent transactions involving multiple storages
+        #   do not deadlock.
         try:
             ncommitted = 0
+            jars = self._get_jars(objects, subtransaction)
             try:
-                # the values in jars will be 2-tuples containing a jar
-                # and an int, where the int indicates the order in
-                # which the jars were added.
-                ncommitted += self._commit_objects(objects, jars,
-                                                   subtransaction, subj)
-
-                self._commit_subtrans(jars, subjars)
-                jarsv = get_jars_in_order(jars)
-                for jar in jarsv:
-                    if not subtransaction:
+                self._commit_begin(jars, subtransaction)
+                ncommitted += self._commit_objects(objects)
+                if not subtransaction and subjars:
+                    self._commit_subtrans(subjars)
+                if not subtransaction:
+                    # Unless this is a really old jar that doesn't
+                    # implement tpc_vote(), it must raise an exception
+                    # if it can't commit the transaction.
+                    for jar in jars:
                         try:
                             vote = jar.tpc_vote
-                        except:
+                        except AttributeError:
                             pass
                         else:
-                            vote(self) # last chance to bail
+                            vote(self)
 
                 # Handle multiple jars separately.  If there are
                 # multiple jars and one fails during the finish, we
                 # mark this transaction manager as hosed.
-                if len(jarsv) == 1:
-                    self._finish_one(jarsv[0])
+                if len(jars) == 1:
+                    self._finish_one(jars[0])
                 else:
-                    self._finish_many(jarsv)
+                    self._finish_many(jars)
             except:
                 # Ugh, we got an got an error during commit, so we
-                # have to clean up.
-                exc_info = sys.exc_info()
-                if jarsv is None:
-                    jarsv = get_jars_in_order(jars)
-                self._commit_error(exc_info, objects, ncommitted,
-                                   jarsv, subjars)
+                # have to clean up.  First save the original exception
+                # in case the cleanup process causes another
+                # exception.
+                t, v, tb = sys.exc_info()
+                self._commit_error(objects, ncommitted, jars, subjars)
+                raise t, v, tb
         finally:
             del objects[:] # clear registered
             if not subtransaction and self._id is not None:
                 free_transaction()
 
-    def _commit_objects(self, objects, jars, subtransaction, subj):
-        # commit objects and return number of commits
-        ncommitted = 0
+    def _get_jars(self, objects, subtransaction):
+        # Returns a list of jars for this transaction.
+        
+        # Find all the jars and sort them in a globally consistent order.
+        # objects is a list of persistent objects and jars.
+        # If this is a subtransaction and a jar is not subtransaction aware,
+        # it's object gets delayed until the parent transaction commits.
+        
+        d = {}
         for o in objects:
-            j = getattr(o, '_p_jar', o)
-            if j is not None:
-                i = id(j)
-                if not jars.has_key(i):
-                    jars[i] = j, len(jars)
-
-                    if subtransaction:
-                        # If a jar does not support subtransactions,
-                        # we need to save it away to be committed in
-                        # the outer transaction.
-                        try:
-                            j.tpc_begin(self, subtransaction)
-                        except TypeError:
-                            j.tpc_begin(self)
+            jar = getattr(o, '_p_jar', o)
+            if jar is None:
+                # I don't think this should ever happen, but can't
+                # prove that it won't.  If there is no jar, there
+                # is nothing to be done.
+                self.log("Object with no jar registered for transaction: "
+                         "%s" % repr(o), level=BLATHER)
+                continue
+            # jar may not be safe as a dictionary key
+            key = id(jar)
+            d[key] = jar
+
+            if subtransaction:
+                if hasattr(jar, "commit_sub"):
+                    self._sub[key] = jar
+                else:
+                    if self._non_st_objects is None:
+                        self._non_st_objects = []
+                    self._non_st_objects.append(o)
+                
+        jars = d.values()
+        jars.sort(jar_cmp)
+
+        return jars
+
+    def _commit_begin(self, jars, subtransaction):
+        for jar in jars:
+            if subtransaction:
+                try:
+                    jar.tpc_begin(self, subtransaction)
+                except TypeError:
+                    # Assume that TypeError means that tpc_begin() only
+                    # takes one argument, and that the jar doesn't
+                    # support subtransactions.
+                    jar.tpc_begin(self)
+            else:
+                self.log("tpc_begin %s" % jar)
+                jar.tpc_begin(self)
 
-                        if hasattr(j, 'commit_sub'):
-                            subj[i] = j
-                        else:
-                            if self._non_st_objects is None:
-                                self._non_st_objects = []
-                            self._non_st_objects.append(o)
-                            continue
-                    else:
-                        j.tpc_begin(self)
-                j.commit(o, self)
+    def _commit_objects(self, objects):
+        ncommitted = 0
+        for o in objects:
+            jar = getattr(o, "_p_jar", o)
+            if jar is None:
+                continue
+            jar.commit(o, self)
             ncommitted += 1
         return ncommitted
 
-    def _commit_subtrans(self, jars, subjars):
+    def _commit_subtrans(self, subjars):
         # Commit work done in subtransactions
-        while subjars:
-            j = subjars.pop()
-            i = id(j)
-            if not jars.has_key(i):
-                jars[i] = j, len(jars)
-            j.commit_sub(self)
+        for jar in subjars:
+            jar.commit_sub(self)
 
     def _finish_one(self, jar):
         try:
-            jar.tpc_finish(self) # This should never fail
+            # The database can't guarantee consistency if call fails.
+            jar.tpc_finish(self)
         except:
             # Bug if it does, we need to keep track of it
             LOG('ZODB', ERROR,
@@ -292,42 +333,40 @@
                 error=sys.exc_info())
             raise
 
-    def _finish_many(self, jarsv):
+    def _finish_many(self, jars):
         global hosed
         try:
-            while jarsv:
-                jarsv[-1].tpc_finish(self) # This should never fail
-                jarsv.pop() # It didn't, so it's taken care of.
+            for jar in jars:
+                # The database can't guarantee consistency if call fails.
+                jar.tpc_finish(self)
         except:
-            # Bug if it does, we need to yell FIRE!
-            # Someone finished, so don't allow any more
-            # work without at least a restart!
             hosed = 1
             LOG('ZODB', PANIC,
                 "A storage error occurred in the last phase of a "
                 "two-phase commit.  This shouldn\'t happen. "
-                "The application may be in a hosed state, so "
-                "transactions will not be allowed to commit "
+                "The application will not be allowed to commit "
                 "until the site/storage is reset by a restart. ",
                 error=sys.exc_info())
             raise
 
-    def _commit_error(self, (t, v, tb),
-                      objects, ncommitted, jarsv, subjars):
-        # handle an exception raised during commit
-        # takes sys.exc_info() as argument
-
-        # First, we have to abort any uncommitted objects.
+    def _commit_error(self, objects, ncommitted, jars, subjars):
+        # First, we have to abort any uncommitted objects.  The abort
+        # will mark the object for invalidation, so that it's last
+        # committed state will be restored.
         for o in objects[ncommitted:]:
             try:
                 j = getattr(o, '_p_jar', o)
                 if j is not None:
                     j.abort(o, self)
             except:
-                pass
-
-        # Then, we unwind TPC for the jars that began it.
-        for j in jarsv:
+                # nothing to do but log the error
+                self.log("Failed to abort object %016x" % utils.U64(o._p_oid),
+                         error=sys.exc_info())
+
+        # Abort the two-phase commit.  It's only necessary to abort the
+        # commit for jars that began it, but it is harmless to abort it
+        # for all.
+        for j in jars:
             try:
                 j.tpc_abort(self) # This should never fail
             except:
@@ -335,9 +374,14 @@
                     "A storage error occured during object abort. This "
                     "shouldn't happen. ", error=sys.exc_info())
 
-        # Ugh, we need to abort work done in sub-transactions.
-        while subjars:
-            j = subjars.pop()
+        # After the tpc_abort(), call abort_sub() on all the
+        # subtrans-aware jars to *really* abort the subtransaction.
+        
+        # Example: For Connection(), the tpc_abort() will abort the
+        # subtransaction TmpStore() and abort_sub() will remove the
+        # TmpStore.
+
+        for j in subjars:
             try:
                 j.abort_sub(self) # This should never fail
             except:
@@ -346,8 +390,6 @@
                     "object abort.  This shouldn't happen.",
                     error=sys.exc_info())
 
-        raise t, v, tb
-
     def register(self,object):
         'Register the given object for transaction control.'
         self._append(object)
@@ -379,8 +421,6 @@
 the system problem.  See your application log for
 information on the error that lead to this problem.
 """
-
-
 
 ############################################################################
 # install get_transaction: