[Checkins] SVN: transaction/trunk/ Switched to using threading.local for the thread-based transaction

Jim Fulton jim at zope.com
Wed May 12 18:14:34 EDT 2010


Log message for revision 112266:
  Switched to using threading.local for the thread-based transaction
  manager. This fixed:
  
  When threads were reused, transaction data could leak accross them,
    causing subtle application bugs.
  
  https://bugs.launchpad.net/zodb/+bug/239086
  

Changed:
  U   transaction/trunk/CHANGES.txt
  U   transaction/trunk/transaction/_manager.py
  U   transaction/trunk/transaction/tests/test_transaction.py

-=-
Modified: transaction/trunk/CHANGES.txt
===================================================================
--- transaction/trunk/CHANGES.txt	2010-05-12 20:42:44 UTC (rev 112265)
+++ transaction/trunk/CHANGES.txt	2010-05-12 22:14:34 UTC (rev 112266)
@@ -28,12 +28,16 @@
 
 Bugs fixed:
 
-
 - Fixed a bug that caused extra commit calls to be made on data
   managers under certain special circumstances.
 
   https://mail.zope.org/pipermail/zodb-dev/2010-May/013329.html
 
+- When threads were reused, transaction data could leak accross them,
+  causing subtle application bugs.
+
+  https://bugs.launchpad.net/zodb/+bug/239086
+
 1.0.1 (2010-05-07)
 ------------------
 

Modified: transaction/trunk/transaction/_manager.py
===================================================================
--- transaction/trunk/transaction/_manager.py	2010-05-12 20:42:44 UTC (rev 112265)
+++ transaction/trunk/transaction/_manager.py	2010-05-12 22:14:34 UTC (rev 112266)
@@ -21,16 +21,8 @@
 from transaction._transaction import Transaction
 from transaction.interfaces import TransientError
 
-import thread
+import threading
 
-
-
-
-# Used for deprecated arguments.  ZODB.utils.DEPRECATED_ARGUMENT was
-# too hard to use here, due to the convoluted import dance across
-# __init__.py files.
-_marker = object()
-
 # We have to remember sets of synch objects, especially Connections.
 # But we don't want mere registration with a transaction manager to
 # keep a synch object alive forever; in particular, it's common
@@ -38,7 +30,6 @@
 # a Connection alive keeps a potentially huge number of other objects
 # alive (e.g., the cache, and everything reachable from it too).
 # Therefore we use "weak sets" internally.
-#
 
 # Call the ISynchronizer newTransaction() method on every element of
 # WeakSet synchs.
@@ -58,7 +49,6 @@
 # so that Transactions "see" synchronizers that get registered after the
 # Transaction object is constructed.
 
-
 class TransactionManager(object):
 
     def __init__(self):
@@ -129,62 +119,12 @@
                 return True
 
 
-class ThreadTransactionManager(TransactionManager):
+class ThreadTransactionManager(TransactionManager, threading.local):
     """Thread-aware transaction manager.
 
     Each thread is associated with a unique transaction.
     """
 
-    def __init__(self):
-        # _threads maps thread ids to transactions
-        self._txns = {}
-
-        # _synchs maps a thread id to a WeakSet of registered synchronizers.
-        # The WeakSet is passed to the Transaction constructor, because the
-        # latter needs to call the synchronizers when it commits.
-        self._synchs = {}
-
-    def begin(self):
-        tid = thread.get_ident()
-        txn = self._txns.get(tid)
-        if txn is not None:
-            txn.abort()
-
-        synchs = self._synchs.get(tid)
-        if synchs is None:
-            synchs = self._synchs[tid] = WeakSet()
-
-        txn = self._txns[tid] = Transaction(synchs, self)
-        _new_transaction(txn, synchs)
-        return txn
-
-    def get(self):
-        tid = thread.get_ident()
-        txn = self._txns.get(tid)
-        if txn is None:
-            synchs = self._synchs.get(tid)
-            if synchs is None:
-                synchs = self._synchs[tid] = WeakSet()
-            txn = self._txns[tid] = Transaction(synchs, self)
-        return txn
-
-    def free(self, txn):
-        tid = thread.get_ident()
-        assert txn is self._txns.get(tid)
-        del self._txns[tid]
-
-    def registerSynch(self, synch):
-        tid = thread.get_ident()
-        ws = self._synchs.get(tid)
-        if ws is None:
-            ws = self._synchs[tid] = WeakSet()
-        ws.add(synch)
-
-    def unregisterSynch(self, synch):
-        tid = thread.get_ident()
-        ws = self._synchs[tid]
-        ws.remove(synch)
-
 class Attempt(object):
 
     def __init__(self, manager):

Modified: transaction/trunk/transaction/tests/test_transaction.py
===================================================================
--- transaction/trunk/transaction/tests/test_transaction.py	2010-05-12 20:42:44 UTC (rev 112265)
+++ transaction/trunk/transaction/tests/test_transaction.py	2010-05-12 22:14:34 UTC (rev 112266)
@@ -673,8 +673,8 @@
       >>> do = DataObject(mgr)
 
       >>> t = transaction.begin()
-      >>> len(t._manager._txns)
-      1
+      >>> t._manager._txn is not None
+      True
 
       >>> t.addAfterCommitHook(hook, ('-', 1))
       >>> transaction.commit()
@@ -682,12 +682,67 @@
       >>> log
       ["True arg '-' kw1 1 kw2 'no_kw2'"]
 
-      >>> len(t._manager._txns)
-      0
+      >>> t._manager._txn is not None
+      False
 
       >>> reset_log()
     """
 
+def bug239086():
+    """
+    The original implementation of thread transaction manager made
+    invalid assumptions about thread ids.
+
+    >>> import transaction.tests.savepointsample
+    >>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
+    >>> dm.keys()
+    []
+
+    >>> class Sync:
+    ...      def __init__(self, label):
+    ...          self.label = label
+    ...      def beforeCompletion(self, t):
+    ...          print self.label, 'before'
+    ...      def afterCompletion(self, t):
+    ...          print self.label, 'after'
+    ...      def newTransaction(self, t):
+    ...          print self.label, 'new'
+    >>> sync = Sync(1)
+
+    >>> import threading
+    >>> def run_in_thread(f):
+    ...     t = threading.Thread(target=f)
+    ...     t.start()
+    ...     t.join()
+
+    >>> @run_in_thread
+    ... def first():
+    ...     transaction.manager.registerSynch(sync)
+    ...     transaction.manager.begin()
+    ...     dm['a'] = 1
+    1 new
+
+    >>> @run_in_thread
+    ... def second():
+    ...     transaction.abort() # should do nothing.
+
+    >>> dm.keys()
+    ['a']
+
+    >>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
+    >>> dm.keys()
+    []
+
+    >>> @run_in_thread
+    ... def first():
+    ...     dm['a'] = 1
+
+    >>> transaction.abort() # should do nothing
+    >>> dm.keys()
+    ['a']
+
+    """
+
 def test_suite():
     suite = unittest.TestSuite((
         DocFileSuite('doom.txt'),



More information about the checkins mailing list