[Checkins] SVN: zope.sqlalchemy/trunk/ merge elro-retry-support branch

Laurence Rowe l at lrowe.co.uk
Sat Jul 24 12:18:47 EDT 2010


Log message for revision 115031:
  merge elro-retry-support branch

Changed:
  A   zope.sqlalchemy/trunk/oracle.cfg
  U   zope.sqlalchemy/trunk/postgres.cfg
  U   zope.sqlalchemy/trunk/src/zope/sqlalchemy/README.txt
  U   zope.sqlalchemy/trunk/src/zope/sqlalchemy/datamanager.py
  U   zope.sqlalchemy/trunk/src/zope/sqlalchemy/tests.py

-=-
Copied: zope.sqlalchemy/trunk/oracle.cfg (from rev 115030, zope.sqlalchemy/branches/elro-retry-support/oracle.cfg)
===================================================================
--- zope.sqlalchemy/trunk/oracle.cfg	                        (rev 0)
+++ zope.sqlalchemy/trunk/oracle.cfg	2010-07-24 16:18:47 UTC (rev 115031)
@@ -0,0 +1,33 @@
+# To run the oracle tests I use the oracle developer days VirtualBox image:
+#  http://www.oracle.com/technology/software/products/virtualbox/appliances/index.html
+# For cx_Oracle to build, download instantclient basiclite and sdk from:
+# http://www.oracle.com/technology/software/tech/oci/instantclient/index.html
+
+[buildout]
+extends = buildout.cfg
+# extends = postgres.cfg
+parts += python-oracle cx_Oracle testora
+python = python-oracle
+allow-hosts += *.sourceforge.net
+
+[python-oracle]
+recipe = gocept.cxoracle
+instant-client = ${buildout:directory}/instantclient-basiclite-10.2.0.4.0-macosx-x64.zip
+instant-sdk = instantclient-sdk-10.2.0.4.0-macosx-x64.zip 
+
+[cx_Oracle]
+recipe = zc.recipe.egg:custom
+egg = cx_Oracle
+
+[test]
+eggs += cx_Oracle
+
+[testora]
+<= test
+environment = oraenv
+
+[scripts]
+eggs += cx_Oracle
+
+[oraenv]
+TEST_DSN = oracle://system:oracle@192.168.56.101/orcl

Modified: zope.sqlalchemy/trunk/postgres.cfg
===================================================================
--- zope.sqlalchemy/trunk/postgres.cfg	2010-07-24 16:14:33 UTC (rev 115030)
+++ zope.sqlalchemy/trunk/postgres.cfg	2010-07-24 16:18:47 UTC (rev 115031)
@@ -1,16 +1,22 @@
-# INCLUDEDIR=/opt/local/include/postgresql90 PATH=/opt/local/lib/postgresql90/bin:$PATH bin/buildout -vvvc postgres.cfg
+# PATH=/opt/local/lib/postgresql90/bin:$PATH bin/buildout -c postgres.cfg
+# sudo -u postgres /opt/local/lib/postgresql90/bin/createdb zope_sqlalchemy_tests
+# sudo -u postgres /opt/local/lib/postgresql90/bin/createuser -s <username>
 # sudo -u postgres /opt/local/lib/postgresql90/bin/postgres -D /opt/local/var/db/postgresql90/defaultdb -d 1
 [buildout]
 extends = buildout.cfg
 find-links = http://initd.org/pub/software/psycopg/
 allow-hosts += initd.org
+parts += testpg
 
 [test]
 eggs += psycopg2
-environment = testenv
 
+[testpg]
+<= test
+environment = pgenv
+
 [scripts]
 eggs += psycopg2
 
-[testenv]
+[pgenv]
 TEST_DSN = postgres:///zope_sqlalchemy_tests

Modified: zope.sqlalchemy/trunk/src/zope/sqlalchemy/README.txt
===================================================================
--- zope.sqlalchemy/trunk/src/zope/sqlalchemy/README.txt	2010-07-24 16:14:33 UTC (rev 115030)
+++ zope.sqlalchemy/trunk/src/zope/sqlalchemy/README.txt	2010-07-24 16:18:47 UTC (rev 115031)
@@ -99,7 +99,7 @@
 We can now create a new user and commit the changes using Zope's transaction
 machinary, just as Zope's publisher would.
 
-    >>> session.add(User(name='bob'))
+    >>> session.add(User(id=1, name='bob'))
     >>> transaction.commit()
 
 Engine level connections are outside the scope of the transaction integration.
@@ -115,7 +115,7 @@
     u'bob'
     >>> bob.addresses
     []
-    >>> bob.addresses.append(Address(email='bob at bob.bob'))
+    >>> bob.addresses.append(Address(id=1, email='bob at bob.bob'))
     >>> transaction.commit()
     >>> session = Session()
     >>> bob = session.query(User).all()[0]

Modified: zope.sqlalchemy/trunk/src/zope/sqlalchemy/datamanager.py
===================================================================
--- zope.sqlalchemy/trunk/src/zope/sqlalchemy/datamanager.py	2010-07-24 16:14:33 UTC (rev 115030)
+++ zope.sqlalchemy/trunk/src/zope/sqlalchemy/datamanager.py	2010-07-24 16:18:47 UTC (rev 115031)
@@ -16,9 +16,27 @@
 from zope.interface import implements
 from transaction.interfaces import ISavepointDataManager, IDataManagerSavepoint
 from transaction._transaction import Status as ZopeStatus
+from sqlalchemy.orm.exc import ConcurrentModificationError
+from sqlalchemy.exc import DBAPIError
 from sqlalchemy.orm.session import SessionExtension
 from sqlalchemy.engine.base import Engine
 
+_retryable_errors = []
+try:
+    import psycopg2.extensions
+except ImportError:
+    pass
+else:
+    _retryable_errors.append((psycopg2.extensions.TransactionRollbackError, None))
+
+# ORA-08177: can't serialize access for this transaction
+try:
+    import cx_Oracle
+except ImportError:
+    pass
+else:
+    _retryable_errors.append((cx_Oracle.DatabaseError, lambda e: e.args[0].code==8177))
+
 # The status of the session is stored on the connection info
 STATUS_ACTIVE = 'active' # session joined to transaction, writes allowed.
 STATUS_CHANGED = 'changed' # data has been written
@@ -110,6 +128,18 @@
     
     def _savepoint(self):
         return SessionSavepoint(self.session)
+    
+    def should_retry(self, error):
+        if isinstance(error, ConcurrentModificationError):
+            return True
+        if isinstance(error, DBAPIError):
+            orig = error.orig
+            for error_type, test in _retryable_errors:
+                if isinstance(orig, error_type):
+                    if test is None:
+                        return True
+                    if test(orig):
+                        return True
 
 
 class TwoPhaseSessionDataManager(SessionDataManager):

Modified: zope.sqlalchemy/trunk/src/zope/sqlalchemy/tests.py
===================================================================
--- zope.sqlalchemy/trunk/src/zope/sqlalchemy/tests.py	2010-07-24 16:14:33 UTC (rev 115030)
+++ zope.sqlalchemy/trunk/src/zope/sqlalchemy/tests.py	2010-07-24 16:18:47 UTC (rev 115031)
@@ -29,8 +29,9 @@
 import unittest
 import transaction
 import threading
+import time
 import sqlalchemy as sa
-from sqlalchemy import orm, sql
+from sqlalchemy import orm, sql, exc
 from zope.sqlalchemy import datamanager as tx
 from zope.sqlalchemy import mark_changed
 
@@ -85,6 +86,7 @@
 class TestTwo(SimpleModel): pass
 
 def setup_mappers():
+    orm.clear_mappers()
     # Other tests can clear mappers by calling clear_mappers(),
     # be more robust by setting up mappers in the test setup.
     m1 = orm.mapper(User, test_users,
@@ -152,8 +154,7 @@
     def tearDown(self):
         transaction.abort()
         metadata.drop_all(engine)
-        for m in self.mappers:
-            m.dispose()
+        orm.clear_mappers()
 
     def testAbortBeforeCommit(self):
         # Simulate what happens in a conflict error
@@ -172,7 +173,7 @@
         transaction.begin()
         session = Session()
         conn = session.connection()
-        conn.execute("SELECT 1")
+        conn.execute("SELECT 1 FROM test_users")
         mark_changed(session)
         transaction.commit()        
 
@@ -198,7 +199,7 @@
         transaction.begin()
         session = Session()
         conn = session.connection()
-        conn.execute("SELECT 1")
+        conn.execute("SELECT 1 FROM test_users")
         mark_changed(session)
         transaction.commit()
 
@@ -461,7 +462,87 @@
         results = engine.connect().execute(test_users.select(test_users.c.lastname=="smith"))
         self.assertEqual(len(results.fetchall()), 2)
 
+class RetryTests(unittest.TestCase):
+    
+    def setUp(self):
+        self.mappers = setup_mappers()
+        metadata.drop_all(engine)
+        metadata.create_all(engine)
 
+        self.tm1 = transaction.TransactionManager()
+        self.tm2 = transaction.TransactionManager()
+        # With psycopg2 you might supply isolation_level='SERIALIZABLE' here,
+        # unfortunately that is not supported by cx_Oracle.
+        e1 = sa.create_engine(TEST_DSN)
+        e2 = sa.create_engine(TEST_DSN)
+        self.s1 = orm.sessionmaker(
+            bind=e1,
+            extension=tx.ZopeTransactionExtension(transaction_manager=self.tm1),
+            twophase=TEST_TWOPHASE,
+            )()
+        self.s2 = orm.sessionmaker(
+            bind=e2,
+            extension=tx.ZopeTransactionExtension(transaction_manager=self.tm2),
+            twophase=TEST_TWOPHASE,
+            )()
+        self.tm1.begin()
+        self.s1.add(User(id=1, firstname='udo', lastname='juergens'))
+        self.tm1.commit()
+
+    def tearDown(self):
+        self.tm1.abort()
+        self.tm2.abort()
+        metadata.drop_all(engine)
+        orm.clear_mappers()
+
+    def testRetry(self):
+        # sqlite is unable to run this test as the databse is locked
+        tm1, tm2, s1, s2 = self.tm1, self.tm2, self.s1, self.s2
+        # make sure we actually start a session.
+        tm1.begin()
+        self.failUnless(len(s1.query(User).all())==1, "Users table should have one row")
+        tm2.begin()
+        self.failUnless(len(s2.query(User).all())==1, "Users table should have one row")
+        s1.query(User).delete()
+        user = s2.query(User).get(1)
+        user.lastname = u'smith'
+        tm1.commit()
+        raised = False
+        try:
+            s2.flush()
+        except orm.exc.ConcurrentModificationError, e:
+            # This error is thrown when the number of updated rows is not as expected
+            raised = True
+        self.failUnless(raised, "Did not raise expected error")
+        self.failUnless(tm2._retryable(type(e), e), "Error should be retryable")
+
+    def testRetryThread(self):
+        tm1, tm2, s1, s2 = self.tm1, self.tm2, self.s1, self.s2
+        # make sure we actually start a session.
+        tm1.begin()
+        self.failUnless(len(s1.query(User).all())==1, "Users table should have one row")
+        tm2.begin()
+        s2.connection().execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
+        self.failUnless(len(s2.query(User).all())==1, "Users table should have one row")
+        s1.query(User).delete()
+        raised = False
+
+        def target():
+            time.sleep(0.2)
+            tm1.commit()
+
+        thread = threading.Thread(target=target)
+        thread.start()
+        try:
+            user = s2.query(User).with_lockmode('update').get(1)
+        except exc.DBAPIError, e:
+            # This error wraps the underlying DBAPI module error, some of which are retryable
+            raised = True
+        self.failUnless(raised, "Did not raise expected error")
+        self.failUnless(tm2._retryable(type(e), e), "Error should be retryable")
+        thread.join() # well, we must have joined by now
+
+
 class MultipleEngineTests(unittest.TestCase):
         
     def setUp(self):
@@ -475,8 +556,7 @@
         transaction.abort()
         bound_metadata1.drop_all()
         bound_metadata2.drop_all()
-        for m in self.mappers:
-            m.dispose()
+        orm.clear_mappers()
 
     def testTwoEngines(self):
         session = UnboundSession()
@@ -500,8 +580,10 @@
     import doctest
     optionflags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS
     suite = TestSuite()
-    for cls in (ZopeSQLAlchemyTests, MultipleEngineTests):
-        suite.addTest(makeSuite(cls))
+    suite.addTest(makeSuite(ZopeSQLAlchemyTests))
+    suite.addTest(makeSuite(MultipleEngineTests))
+    if TEST_DSN.startswith('postgres') or TEST_DSN.startswith('oracle'):
+        suite.addTest(makeSuite(RetryTests))
     suite.addTest(doctest.DocFileSuite('README.txt', optionflags=optionflags, tearDown=tearDownReadMe,
         globs={'TEST_DSN': TEST_DSN, 'TEST_TWOPHASE': TEST_TWOPHASE}))
     return suite



More information about the checkins mailing list