[Checkins] SVN: transaction/branches/sphinx/ Coverage for Transaction.commit.

Tres Seaver cvs-admin at zope.org
Tue Dec 18 02:41:50 UTC 2012


Log message for revision 128741:
  Coverage for Transaction.commit.

Changed:
  _U  transaction/branches/sphinx/
  U   transaction/branches/sphinx/transaction/tests/common.py
  U   transaction/branches/sphinx/transaction/tests/test__transaction.py

-=-
Modified: transaction/branches/sphinx/transaction/tests/common.py
===================================================================
--- transaction/branches/sphinx/transaction/tests/common.py	2012-12-18 01:57:44 UTC (rev 128740)
+++ transaction/branches/sphinx/transaction/tests/common.py	2012-12-18 02:41:50 UTC (rev 128741)
@@ -35,7 +35,7 @@
         else:
             self._log.append((level, msg))
     def debug(self, msg, *args, **kw):
-        self.log('DEBUG', msg, *args, **kw)
+        self.log('debug', msg, *args, **kw)
     def error(self, msg, *args, **kw):
         self.log('error', msg, *args, **kw)
 

Modified: transaction/branches/sphinx/transaction/tests/test__transaction.py
===================================================================
--- transaction/branches/sphinx/transaction/tests/test__transaction.py	2012-12-18 01:57:44 UTC (rev 128740)
+++ transaction/branches/sphinx/transaction/tests/test__transaction.py	2012-12-18 02:41:50 UTC (rev 128741)
@@ -67,7 +67,7 @@
         self.assertEqual(t._extension, {})
         self.assertTrue(t.log is logger)
         self.assertEqual(len(logger._log), 1)
-        self.assertEqual(logger._log[0][0], 'DEBUG')
+        self.assertEqual(logger._log[0][0], 'debug')
         self.assertEqual(logger._log[0][1], 'new transaction')
         self.assertTrue(t._failure_traceback is None)
         self.assertEqual(t._before_commit, [])
@@ -330,6 +330,216 @@
         t.register(dummy)
         self.assertTrue(dummy in adapter.objects)
 
+    def test_commit_DOOMED(self):
+        from transaction.interfaces import DoomedTransaction
+        from transaction._transaction import Status
+        t = self._makeOne()
+        t.status = Status.DOOMED
+        self.assertRaises(DoomedTransaction, t.commit)
+
+    def test_commit_COMMITFAILED(self):
+        from transaction._transaction import Status
+        from transaction.interfaces import TransactionFailedError
+        class _Traceback(object):
+            def getvalue(self):
+                return 'TRACEBACK'
+        t = self._makeOne()
+        t.status = Status.COMMITFAILED
+        t._failure_traceback = _Traceback()
+        self.assertRaises(TransactionFailedError, t.commit)
+
+    def test_commit_wo_savepoints_wo_hooks_wo_synchronizers(self):
+        from transaction._transaction import Status
+        from transaction.tests.common import DummyLogger
+        from transaction.tests.common import Monkey
+        from transaction import _transaction
+        class _Mgr(object):
+            def __init__(self, txn):
+                self._txn = txn
+            def free(self, txn):
+                assert txn is self._txn
+                self._txn = None
+        logger = DummyLogger()
+        with Monkey(_transaction, _LOGGER=logger):
+            t = self._makeOne()
+            logger._clear()
+            mgr = t._manager = _Mgr(t)
+            t.commit()
+        self.assertEqual(t.status, Status.COMMITTED)
+        self.assertTrue(mgr._txn is None)
+        self.assertEqual(logger._log[0][0], 'debug')
+        self.assertEqual(logger._log[0][1], 'commit')
+
+    def test_commit_w_savepoints(self):
+        from weakref import WeakKeyDictionary
+        from transaction.tests.common import DummyLogger
+        from transaction.tests.common import Monkey
+        from transaction import _transaction
+        class _SP(object):
+            def __init__(self, t, index):
+                self.transaction = t
+                self._index = index
+            def __repr__(self):
+                return '_SP: %d' % self._index
+        logger = DummyLogger()
+        with Monkey(_transaction, _LOGGER=logger):
+            t = self._makeOne()
+            t._savepoint2index = WeakKeyDictionary()
+            holdme = []
+            for i in range(10):
+                sp = _SP(t, i)
+                holdme.append(sp) #prevent gc
+                t._savepoint2index[sp] = i
+            logger._clear()
+            t.commit()
+        self.assertEqual(list(t._savepoint2index), [])
+
+    def test_commit_w_beforeCommitHooks(self):
+        from transaction.tests.common import DummyLogger
+        from transaction.tests.common import Monkey
+        from transaction import _transaction
+        _hooked1, _hooked2 = [], []
+        def _hook1(*args, **kw):
+            _hooked1.append((args, kw))
+        def _hook2(*args, **kw):
+            _hooked2.append((args, kw))
+        logger = DummyLogger()
+        with Monkey(_transaction, _LOGGER=logger):
+            t = self._makeOne()
+            t._before_commit.append((_hook1, ('one',), {'uno': 1}))
+            t._before_commit.append((_hook2, (), {}))
+            logger._clear()
+            t.commit()
+        self.assertEqual(_hooked1, [(('one',), {'uno': 1})])
+        self.assertEqual(_hooked2, [((), {})])
+        self.assertEqual(t._before_commit, [])
+
+    def test_commit_w_synchronizers(self):
+        from transaction.weakset import WeakSet
+        from transaction.tests.common import DummyLogger
+        from transaction.tests.common import Monkey
+        from transaction import _transaction
+        class _Synch(object):
+            _before = _after = False
+            def beforeCompletion(self, txn):
+                self._before = txn
+            def afterCompletion(self, txn):
+                self._after = txn
+        synchs = [_Synch(), _Synch(), _Synch()]
+        ws = WeakSet()
+        for synch in synchs:
+            ws.add(synch)
+        logger = DummyLogger()
+        with Monkey(_transaction, _LOGGER=logger):
+            t = self._makeOne(synchronizers=ws)
+            logger._clear()
+            t.commit()
+        for synch in synchs:
+            self.assertTrue(synch._before is t)
+            self.assertTrue(synch._after is t)
+
+    def test_commit_w_afterCommitHooks(self):
+        from transaction.tests.common import DummyLogger
+        from transaction.tests.common import Monkey
+        from transaction import _transaction
+        _hooked1, _hooked2 = [], []
+        def _hook1(*args, **kw):
+            _hooked1.append((args, kw))
+        def _hook2(*args, **kw):
+            _hooked2.append((args, kw))
+        logger = DummyLogger()
+        with Monkey(_transaction, _LOGGER=logger):
+            t = self._makeOne()
+            t._after_commit.append((_hook1, ('one',), {'uno': 1}))
+            t._after_commit.append((_hook2, (), {}))
+            logger._clear()
+            t.commit()
+        self.assertEqual(_hooked1, [((True, 'one',), {'uno': 1})])
+        self.assertEqual(_hooked2, [((True,), {})])
+        self.assertEqual(t._after_commit, [])
+
+    def test_commit_error_w_afterCompleteHooks(self):
+        from transaction import _transaction
+        from transaction.tests.common import DummyLogger
+        from transaction.tests.common import Monkey
+        class BrokenResource(object):
+            def sortKey(self):
+                return 'zzz'
+            def tpc_begin(self, txn):
+                raise ValueError('test')
+        broken = BrokenResource()
+        class Resource(object):
+            _b = _c = _v = _f = _a = _x =False
+            def sortKey(self):
+                return 'aaa'
+            def tpc_begin(self, txn):
+                self._b = True
+            def commit(self, txn):
+                self._c = True
+            def tpc_vote(self, txn):
+                self._v = True
+            def tpc_finish(self, txn):
+                self._f = True
+            def abort(self, txn):
+                self._a = True
+            def tpc_abort(self, txn):
+                self._x = True
+        resource = Resource()
+        _hooked1, _hooked2 = [], []
+        def _hook1(*args, **kw):
+            _hooked1.append((args, kw))
+        def _hook2(*args, **kw):
+            _hooked2.append((args, kw))
+        logger = DummyLogger()
+        with Monkey(_transaction, _LOGGER=logger):
+            t = self._makeOne()
+            t._after_commit.append((_hook1, ('one',), {'uno': 1}))
+            t._after_commit.append((_hook2, (), {}))
+            t._resources.append(broken)
+            t._resources.append(resource)
+            logger._clear()
+            self.assertRaises(ValueError, t.commit)
+        self.assertEqual(_hooked1, [((False, 'one',), {'uno': 1})])
+        self.assertEqual(_hooked2, [((False,), {})])
+        self.assertEqual(t._after_commit, [])
+        self.assertTrue(resource._b)
+        self.assertFalse(resource._c)
+        self.assertFalse(resource._v)
+        self.assertFalse(resource._f)
+        self.assertTrue(resource._a)
+        self.assertTrue(resource._x)
+
+    def test_commit_error_w_synchronizers(self):
+        from transaction.weakset import WeakSet
+        from transaction.tests.common import DummyLogger
+        from transaction.tests.common import Monkey
+        from transaction import _transaction
+        class _Synch(object):
+            _before = _after = False
+            def beforeCompletion(self, txn):
+                self._before = txn
+            def afterCompletion(self, txn):
+                self._after = txn
+        synchs = [_Synch(), _Synch(), _Synch()]
+        ws = WeakSet()
+        for synch in synchs:
+            ws.add(synch)
+        class BrokenResource(object):
+            def sortKey(self):
+                return 'zzz'
+            def tpc_begin(self, txn):
+                raise ValueError('test')
+        broken = BrokenResource()
+        logger = DummyLogger()
+        with Monkey(_transaction, _LOGGER=logger):
+            t = self._makeOne(synchronizers=ws)
+            logger._clear()
+            t._resources.append(broken)
+            self.assertRaises(ValueError, t.commit)
+        for synch in synchs:
+            self.assertTrue(synch._before is t)
+            self.assertTrue(synch._after is t) #called in _cleanup
+
     def test_note(self):
         t = self._makeOne()
         try:



More information about the checkins mailing list