[Checkins] SVN: transaction/trunk/ Switched to using threading.local for the thread-based transaction
Jim Fulton
jim at zope.com
Wed May 12 18:14:34 EDT 2010
Log message for revision 112266:
Switched to using threading.local for the thread-based transaction
manager. This fixed:
When threads were reused, transaction data could leak accross them,
causing subtle application bugs.
https://bugs.launchpad.net/zodb/+bug/239086
Changed:
U transaction/trunk/CHANGES.txt
U transaction/trunk/transaction/_manager.py
U transaction/trunk/transaction/tests/test_transaction.py
-=-
Modified: transaction/trunk/CHANGES.txt
===================================================================
--- transaction/trunk/CHANGES.txt 2010-05-12 20:42:44 UTC (rev 112265)
+++ transaction/trunk/CHANGES.txt 2010-05-12 22:14:34 UTC (rev 112266)
@@ -28,12 +28,16 @@
Bugs fixed:
-
- Fixed a bug that caused extra commit calls to be made on data
managers under certain special circumstances.
https://mail.zope.org/pipermail/zodb-dev/2010-May/013329.html
+- When threads were reused, transaction data could leak accross them,
+ causing subtle application bugs.
+
+ https://bugs.launchpad.net/zodb/+bug/239086
+
1.0.1 (2010-05-07)
------------------
Modified: transaction/trunk/transaction/_manager.py
===================================================================
--- transaction/trunk/transaction/_manager.py 2010-05-12 20:42:44 UTC (rev 112265)
+++ transaction/trunk/transaction/_manager.py 2010-05-12 22:14:34 UTC (rev 112266)
@@ -21,16 +21,8 @@
from transaction._transaction import Transaction
from transaction.interfaces import TransientError
-import thread
+import threading
-
-
-
-# Used for deprecated arguments. ZODB.utils.DEPRECATED_ARGUMENT was
-# too hard to use here, due to the convoluted import dance across
-# __init__.py files.
-_marker = object()
-
# We have to remember sets of synch objects, especially Connections.
# But we don't want mere registration with a transaction manager to
# keep a synch object alive forever; in particular, it's common
@@ -38,7 +30,6 @@
# a Connection alive keeps a potentially huge number of other objects
# alive (e.g., the cache, and everything reachable from it too).
# Therefore we use "weak sets" internally.
-#
# Call the ISynchronizer newTransaction() method on every element of
# WeakSet synchs.
@@ -58,7 +49,6 @@
# so that Transactions "see" synchronizers that get registered after the
# Transaction object is constructed.
-
class TransactionManager(object):
def __init__(self):
@@ -129,62 +119,12 @@
return True
-class ThreadTransactionManager(TransactionManager):
+class ThreadTransactionManager(TransactionManager, threading.local):
"""Thread-aware transaction manager.
Each thread is associated with a unique transaction.
"""
- def __init__(self):
- # _threads maps thread ids to transactions
- self._txns = {}
-
- # _synchs maps a thread id to a WeakSet of registered synchronizers.
- # The WeakSet is passed to the Transaction constructor, because the
- # latter needs to call the synchronizers when it commits.
- self._synchs = {}
-
- def begin(self):
- tid = thread.get_ident()
- txn = self._txns.get(tid)
- if txn is not None:
- txn.abort()
-
- synchs = self._synchs.get(tid)
- if synchs is None:
- synchs = self._synchs[tid] = WeakSet()
-
- txn = self._txns[tid] = Transaction(synchs, self)
- _new_transaction(txn, synchs)
- return txn
-
- def get(self):
- tid = thread.get_ident()
- txn = self._txns.get(tid)
- if txn is None:
- synchs = self._synchs.get(tid)
- if synchs is None:
- synchs = self._synchs[tid] = WeakSet()
- txn = self._txns[tid] = Transaction(synchs, self)
- return txn
-
- def free(self, txn):
- tid = thread.get_ident()
- assert txn is self._txns.get(tid)
- del self._txns[tid]
-
- def registerSynch(self, synch):
- tid = thread.get_ident()
- ws = self._synchs.get(tid)
- if ws is None:
- ws = self._synchs[tid] = WeakSet()
- ws.add(synch)
-
- def unregisterSynch(self, synch):
- tid = thread.get_ident()
- ws = self._synchs[tid]
- ws.remove(synch)
-
class Attempt(object):
def __init__(self, manager):
Modified: transaction/trunk/transaction/tests/test_transaction.py
===================================================================
--- transaction/trunk/transaction/tests/test_transaction.py 2010-05-12 20:42:44 UTC (rev 112265)
+++ transaction/trunk/transaction/tests/test_transaction.py 2010-05-12 22:14:34 UTC (rev 112266)
@@ -673,8 +673,8 @@
>>> do = DataObject(mgr)
>>> t = transaction.begin()
- >>> len(t._manager._txns)
- 1
+ >>> t._manager._txn is not None
+ True
>>> t.addAfterCommitHook(hook, ('-', 1))
>>> transaction.commit()
@@ -682,12 +682,67 @@
>>> log
["True arg '-' kw1 1 kw2 'no_kw2'"]
- >>> len(t._manager._txns)
- 0
+ >>> t._manager._txn is not None
+ False
>>> reset_log()
"""
+def bug239086():
+ """
+ The original implementation of thread transaction manager made
+ invalid assumptions about thread ids.
+
+ >>> import transaction.tests.savepointsample
+ >>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
+ >>> dm.keys()
+ []
+
+ >>> class Sync:
+ ... def __init__(self, label):
+ ... self.label = label
+ ... def beforeCompletion(self, t):
+ ... print self.label, 'before'
+ ... def afterCompletion(self, t):
+ ... print self.label, 'after'
+ ... def newTransaction(self, t):
+ ... print self.label, 'new'
+ >>> sync = Sync(1)
+
+ >>> import threading
+ >>> def run_in_thread(f):
+ ... t = threading.Thread(target=f)
+ ... t.start()
+ ... t.join()
+
+ >>> @run_in_thread
+ ... def first():
+ ... transaction.manager.registerSynch(sync)
+ ... transaction.manager.begin()
+ ... dm['a'] = 1
+ 1 new
+
+ >>> @run_in_thread
+ ... def second():
+ ... transaction.abort() # should do nothing.
+
+ >>> dm.keys()
+ ['a']
+
+ >>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
+ >>> dm.keys()
+ []
+
+ >>> @run_in_thread
+ ... def first():
+ ... dm['a'] = 1
+
+ >>> transaction.abort() # should do nothing
+ >>> dm.keys()
+ ['a']
+
+ """
+
def test_suite():
suite = unittest.TestSuite((
DocFileSuite('doom.txt'),
More information about the checkins
mailing list