[Checkins] SVN: zope.sqlalchemy/branches/elro-retry-support/ retry support working under postgres, still some oracle problems

Laurence Rowe l at lrowe.co.uk
Fri Jul 23 14:42:39 EDT 2010


Log message for revision 114967:
  retry support working under postgres, still some oracle problems

Changed:
  A   zope.sqlalchemy/branches/elro-retry-support/oracle.cfg
  U   zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/datamanager.py
  U   zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/tests.py

-=-
Added: zope.sqlalchemy/branches/elro-retry-support/oracle.cfg
===================================================================
--- zope.sqlalchemy/branches/elro-retry-support/oracle.cfg	                        (rev 0)
+++ zope.sqlalchemy/branches/elro-retry-support/oracle.cfg	2010-07-23 18:42:39 UTC (rev 114967)
@@ -0,0 +1,24 @@
+[buildout]
+extends = buildout.cfg
+parts += python-oracle cx_Oracle test
+python = python-oracle
+allow-hosts += *.sourceforge.net
+
+[python-oracle]
+recipe = gocept.cxoracle
+instant-client = ${buildout:directory}/instantclient-basiclite-10.2.0.4.0-macosx-x64.zip
+instant-sdk = instantclient-sdk-10.2.0.4.0-macosx-x64.zip 
+
+[cx_Oracle]
+recipe = zc.recipe.egg:custom
+egg = cx_Oracle
+
+[test]
+eggs += cx_Oracle
+environment = testenv
+
+[scripts]
+eggs += cx_Oracle
+
+[testenv]
+TEST_DSN = oracle://system:oracle@192.168.56.101/orcl

Modified: zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/datamanager.py
===================================================================
--- zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/datamanager.py	2010-07-23 18:41:59 UTC (rev 114966)
+++ zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/datamanager.py	2010-07-23 18:42:39 UTC (rev 114967)
@@ -16,9 +16,27 @@
 from zope.interface import implements
 from transaction.interfaces import ISavepointDataManager, IDataManagerSavepoint
 from transaction._transaction import Status as ZopeStatus
+from sqlalchemy.orm.exc import ConcurrentModificationError
+from sqlalchemy.exc import DBAPIError
 from sqlalchemy.orm.session import SessionExtension
 from sqlalchemy.engine.base import Engine
 
+_retryable_errors = []
+try:
+    import psycopg2.extensions
+except ImportError:
+    pass
+else:
+    _retryable_errors.append((psycopg2.extensions.TransactionRollbackError, None))
+
+# ORA-08177: can't serialize access for this transaction
+try:
+    import cx_Oracle
+except ImportError:
+    pass
+else:
+    _retryable_errors.append((cx_Oracle.DatabaseError, lambda e: e.args[0].code==8177))
+
 # The status of the session is stored on the connection info
 STATUS_ACTIVE = 'active' # session joined to transaction, writes allowed.
 STATUS_CHANGED = 'changed' # data has been written
@@ -110,6 +128,18 @@
     
     def _savepoint(self):
         return SessionSavepoint(self.session)
+    
+    def should_retry(self, error):
+        if isinstance(error, ConcurrentModificationError):
+            return True
+        if isinstance(error, DBAPIError):
+            orig = error.orig
+            for error_type, test in _retryable_errors:
+                if isinstance(orig, error_type):
+                    if test is None:
+                        return True
+                    if test(orig):
+                        return True
 
 
 class TwoPhaseSessionDataManager(SessionDataManager):

Modified: zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/tests.py
===================================================================
--- zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/tests.py	2010-07-23 18:41:59 UTC (rev 114966)
+++ zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/tests.py	2010-07-23 18:42:39 UTC (rev 114967)
@@ -29,8 +29,9 @@
 import unittest
 import transaction
 import threading
+import time
 import sqlalchemy as sa
-from sqlalchemy import orm, sql
+from sqlalchemy import orm, sql, exc
 from zope.sqlalchemy import datamanager as tx
 from zope.sqlalchemy import mark_changed
 
@@ -85,6 +86,7 @@
 class TestTwo(SimpleModel): pass
 
 def setup_mappers():
+    orm.clear_mappers()
     # Other tests can clear mappers by calling clear_mappers(),
     # be more robust by setting up mappers in the test setup.
     m1 = orm.mapper(User, test_users,
@@ -152,8 +154,7 @@
     def tearDown(self):
         transaction.abort()
         metadata.drop_all(engine)
-        for m in self.mappers:
-            m.dispose()
+        orm.clear_mappers()
 
     def testAbortBeforeCommit(self):
         # Simulate what happens in a conflict error
@@ -172,7 +173,7 @@
         transaction.begin()
         session = Session()
         conn = session.connection()
-        conn.execute("SELECT 1")
+        conn.execute("SELECT 1 FROM test_users")
         mark_changed(session)
         transaction.commit()        
 
@@ -198,7 +199,7 @@
         transaction.begin()
         session = Session()
         conn = session.connection()
-        conn.execute("SELECT 1")
+        conn.execute("SELECT 1 FROM test_users")
         mark_changed(session)
         transaction.commit()
 
@@ -461,7 +462,83 @@
         results = engine.connect().execute(test_users.select(test_users.c.lastname=="smith"))
         self.assertEqual(len(results.fetchall()), 2)
 
+class RetryTests(unittest.TestCase):
+    
+    def setUp(self):
+        self.mappers = setup_mappers()
+        metadata.drop_all(engine)
+        metadata.create_all(engine)
 
+        self.tm1 = transaction.TransactionManager()
+        self.tm2 = transaction.TransactionManager()
+        e1 = sa.create_engine(TEST_DSN, isolation_level='SERIALIZABLE', echo=True)
+        e2 = sa.create_engine(TEST_DSN, isolation_level='SERIALIZABLE', echo=True)
+        self.s1 = orm.sessionmaker(
+            bind=e1,
+            extension=tx.ZopeTransactionExtension(transaction_manager=self.tm1),
+            twophase=TEST_TWOPHASE,
+            )()
+        self.s2 = orm.sessionmaker(
+            bind=e2,
+            extension=tx.ZopeTransactionExtension(transaction_manager=self.tm2),
+            twophase=TEST_TWOPHASE,
+            )()
+        self.tm1.begin()
+        self.s1.add(User(id=1, firstname='udo', lastname='juergens'))
+        self.tm1.commit()
+
+    def tearDown(self):
+        self.tm1.abort()
+        self.tm2.abort()
+        metadata.drop_all(engine)
+        orm.clear_mappers()
+
+    def testRetry(self):
+        tm1, tm2, s1, s2 = self.tm1, self.tm2, self.s1, self.s2
+        # make sure we actually start a session.
+        tm1.begin()
+        self.failUnless(len(s1.query(User).all())==1, "Users table should have one row")
+        tm2.begin()
+        self.failUnless(len(s2.query(User).all())==1, "Users table should have one row")
+        s1.query(User).delete()
+        user = s2.query(User).get(1)
+        user.lastname = u'smith'
+        tm1.commit()
+        raised = False
+        try:
+            s2.flush()
+        except orm.exc.ConcurrentModificationError, e:
+            # This error is thrown when the number of updated rows is not as expected
+            raised = True
+        self.failUnless(raised, "Did not raise expected error")
+        self.failUnless(tm2._retryable(type(e), e), "Error should be retryable")
+
+    def testRetryThread(self):
+        tm1, tm2, s1, s2 = self.tm1, self.tm2, self.s1, self.s2
+        # make sure we actually start a session.
+        tm1.begin()
+        self.failUnless(len(s1.query(User).all())==1, "Users table should have one row")
+        tm2.begin()
+        self.failUnless(len(s2.query(User).all())==1, "Users table should have one row")
+        s1.query(User).delete()
+        raised = False
+
+        def target():
+            time.sleep(0.2)
+            tm1.commit()
+
+        thread = threading.Thread(target=target)
+        thread.start()
+        try:
+            user = s2.query(User).with_lockmode('update').get(1)
+        except exc.DBAPIError, e:
+            # This error wraps the underlying DBAPI module error, some of which are retryable
+            raised = True
+        self.failUnless(raised, "Did not raise expected error")
+        self.failUnless(tm2._retryable(type(e), e), "Error should be retryable")
+        thread.join() # well, we must have joined by now
+
+
 class MultipleEngineTests(unittest.TestCase):
         
     def setUp(self):
@@ -475,8 +552,7 @@
         transaction.abort()
         bound_metadata1.drop_all()
         bound_metadata2.drop_all()
-        for m in self.mappers:
-            m.dispose()
+        orm.clear_mappers()
 
     def testTwoEngines(self):
         session = UnboundSession()
@@ -500,7 +576,7 @@
     import doctest
     optionflags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS
     suite = TestSuite()
-    for cls in (ZopeSQLAlchemyTests, MultipleEngineTests):
+    for cls in (ZopeSQLAlchemyTests, MultipleEngineTests, RetryTests):
         suite.addTest(makeSuite(cls))
     suite.addTest(doctest.DocFileSuite('README.txt', optionflags=optionflags, tearDown=tearDownReadMe,
         globs={'TEST_DSN': TEST_DSN, 'TEST_TWOPHASE': TEST_TWOPHASE}))



More information about the checkins mailing list