[Checkins] SVN: zope.sqlalchemy/trunk/ zope.sqlalchemy first checkin

Laurence Rowe l at lrowe.co.uk
Mon May 5 15:24:57 EDT 2008


Log message for revision 86464:
  zope.sqlalchemy first checkin

Changed:
  A   zope.sqlalchemy/trunk/README.txt
  A   zope.sqlalchemy/trunk/bootstrap.py
  A   zope.sqlalchemy/trunk/buildout.cfg
  A   zope.sqlalchemy/trunk/setup.py
  A   zope.sqlalchemy/trunk/sqlalchemy.diff
  A   zope.sqlalchemy/trunk/src/
  A   zope.sqlalchemy/trunk/src/zope/
  A   zope.sqlalchemy/trunk/src/zope/__init__.py
  A   zope.sqlalchemy/trunk/src/zope/sqlalchemy/
  A   zope.sqlalchemy/trunk/src/zope/sqlalchemy/README.txt
  A   zope.sqlalchemy/trunk/src/zope/sqlalchemy/__init__.py
  A   zope.sqlalchemy/trunk/src/zope/sqlalchemy/datamanager.py
  A   zope.sqlalchemy/trunk/src/zope/sqlalchemy/tests.py

-=-
Added: zope.sqlalchemy/trunk/README.txt
===================================================================
--- zope.sqlalchemy/trunk/README.txt	                        (rev 0)
+++ zope.sqlalchemy/trunk/README.txt	2008-05-05 19:24:56 UTC (rev 86464)
@@ -0,0 +1,5 @@
+=================
+ zope.sqlalchemy
+=================
+
+Zope/SQLAlchemy transaction integration. See src/zope/sqlalchemy/README.txt.

Added: zope.sqlalchemy/trunk/bootstrap.py
===================================================================
--- zope.sqlalchemy/trunk/bootstrap.py	                        (rev 0)
+++ zope.sqlalchemy/trunk/bootstrap.py	2008-05-05 19:24:56 UTC (rev 86464)
@@ -0,0 +1,52 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Bootstrap a buildout-based project
+
+Simply run this script in a directory containing a buildout.cfg.
+The script accepts buildout command-line options, so you can
+use the -c option to specify an alternate configuration file.
+
+$Id: bootstrap.py 71258 2006-11-21 22:22:48Z jim $
+"""
+
+import os, shutil, sys, tempfile, urllib2
+
+tmpeggs = tempfile.mkdtemp()
+
+ez = {}
+exec urllib2.urlopen('http://peak.telecommunity.com/dist/ez_setup.py'
+                     ).read() in ez
+ez['use_setuptools'](to_dir=tmpeggs, download_delay=0)
+
+import pkg_resources
+
+cmd = 'from setuptools.command.easy_install import main; main()'
+if sys.platform == 'win32':
+    cmd = '"%s"' % cmd # work around spawn lamosity on windows
+
+ws = pkg_resources.working_set
+assert os.spawnle(
+    os.P_WAIT, sys.executable, sys.executable,
+    '-c', cmd, '-mqNxd', tmpeggs, 'zc.buildout',
+    dict(os.environ,
+         PYTHONPATH=
+         ws.find(pkg_resources.Requirement.parse('setuptools')).location
+         ),
+    ) == 0
+
+ws.add_entry(tmpeggs)
+ws.require('zc.buildout')
+import zc.buildout.buildout
+zc.buildout.buildout.main(sys.argv[1:] + ['bootstrap'])
+shutil.rmtree(tmpeggs)

Added: zope.sqlalchemy/trunk/buildout.cfg
===================================================================
--- zope.sqlalchemy/trunk/buildout.cfg	                        (rev 0)
+++ zope.sqlalchemy/trunk/buildout.cfg	2008-05-05 19:24:56 UTC (rev 86464)
@@ -0,0 +1,13 @@
+[buildout]
+develop = .
+parts = test scripts
+find-links = http://download.zope.org/distribution/
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = zope.sqlalchemy
+
+[scripts]
+recipe = zc.recipe.egg
+eggs = zope.sqlalchemy
+interpreter = py
\ No newline at end of file

Added: zope.sqlalchemy/trunk/setup.py
===================================================================
--- zope.sqlalchemy/trunk/setup.py	                        (rev 0)
+++ zope.sqlalchemy/trunk/setup.py	2008-05-05 19:24:56 UTC (rev 86464)
@@ -0,0 +1,43 @@
+from setuptools import setup, find_packages
+import sys, os
+
+def read(*rnames):
+    return open(os.path.join(os.path.dirname(__file__), *rnames)).read()
+
+version = '0.1'
+
+setup(name='zope.sqlalchemy',
+      version=version,
+      description="Minimal Zope/SQLAlchemy transaction integration",
+      long_description=read('src', 'zope', 'sqlalchemy', 'README.txt'),
+      # Get more strings from http://www.python.org/pypi?%3Aaction=list_classifiers
+      classifiers=[
+        "Framework :: Zope3",
+        "Programming Language :: Python",
+        "License :: OSI Approved :: Zope Public License",
+        "Topic :: Software Development :: Libraries :: Python Modules",
+        ],
+      keywords='',
+      author='Laurence Rowe',
+      author_email='laurence at lrowe.co.uk',
+      url='http://svn.zope.org/zope.sqlalchemy',
+      license='ZPL 2.1',
+      packages=find_packages('src'),
+      package_dir = {'':'src'},
+      namespace_packages=['zope'],
+      include_package_data=True,
+      zip_safe=False,
+      install_requires=[
+          # -*- Extra requirements: -*-
+          'setuptools',
+          'SQLAlchemy>=0.4.4',
+          'transaction',
+          'zope.interface',
+      ],
+      entry_points="""
+      # -*- Entry points: -*-
+      """,
+      extras_require = dict(
+              test = ['zope.testing'],
+              ),
+      )

Added: zope.sqlalchemy/trunk/sqlalchemy.diff
===================================================================
--- zope.sqlalchemy/trunk/sqlalchemy.diff	                        (rev 0)
+++ zope.sqlalchemy/trunk/sqlalchemy.diff	2008-05-05 19:24:56 UTC (rev 86464)
@@ -0,0 +1,67 @@
+Index: test/orm/session.py
+===================================================================
+--- test/orm/session.py	(revision 4574)
++++ test/orm/session.py	(working copy)
+@@ -881,19 +881,20 @@
+                 log.append('after_flush')
+             def after_flush_postexec(self, session, flush_context):
+                 log.append('after_flush_postexec')
++            def after_begin(self, session, transaction, connection):
++                log.append('after_begin')
+         sess = create_session(extension = MyExt())
+         u = User()
+         sess.save(u)
+         sess.flush()
++        assert log == ['before_flush', 'after_begin', 'after_flush', 'before_commit', 'after_commit', 'after_flush_postexec']
+ 
+-        assert log == ['before_flush', 'after_flush', 'before_commit', 'after_commit', 'after_flush_postexec']
+-
+         log = []
+         sess = create_session(transactional=True, extension=MyExt())
+         u = User()
+         sess.save(u)
+         sess.flush()
+-        assert log == ['before_flush', 'after_flush', 'after_flush_postexec']
++        assert log == ['before_flush', 'after_begin', 'after_flush', 'after_flush_postexec']
+ 
+         log = []
+         u.user_name = 'ed'
+@@ -903,6 +904,11 @@
+         log = []
+         sess.commit()
+         assert log == ['before_commit', 'after_commit']
++        
++        log = []
++        sess = create_session(transactional=True, extension=MyExt(), bind=testing.db)
++        conn = sess.connection()
++        assert log == ['after_begin']
+ 
+     def test_pickled_update(self):
+         mapper(User, users)
+Index: lib/sqlalchemy/orm/session.py
+===================================================================
+--- lib/sqlalchemy/orm/session.py	(revision 4574)
++++ lib/sqlalchemy/orm/session.py	(working copy)
+@@ -127,6 +127,13 @@
+         state.  An actual commit() may or may not have occured, depending on whether or not
+         the flush started its own transaction or participated in a larger transaction.
+         """
++    
++    def after_begin(self, session, transaction, connection):
++        """Execute after a transaction is begun on a connection
++        
++        `transaction` is the SessionTransaction. This method is called after an
++        engine level transaction is begun on a connection.
++        """
+ 
+ class SessionTransaction(object):
+     """Represents a Session-level Transaction.
+@@ -214,6 +221,8 @@
+             transaction = conn.begin()
+         
+         self._connections[conn] = self._connections[conn.engine] = (conn, transaction, conn is not bind)
++        if self.session.extension is not None:
++            self.session.extension.after_begin(self.session, self, conn)
+         return conn
+ 
+     def prepare(self):

Added: zope.sqlalchemy/trunk/src/zope/__init__.py
===================================================================
--- zope.sqlalchemy/trunk/src/zope/__init__.py	                        (rev 0)
+++ zope.sqlalchemy/trunk/src/zope/__init__.py	2008-05-05 19:24:56 UTC (rev 86464)
@@ -0,0 +1,6 @@
+# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
+try:
+    __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+    from pkgutil import extend_path
+    __path__ = extend_path(__path__, __name__)

Added: zope.sqlalchemy/trunk/src/zope/sqlalchemy/README.txt
===================================================================
--- zope.sqlalchemy/trunk/src/zope/sqlalchemy/README.txt	                        (rev 0)
+++ zope.sqlalchemy/trunk/src/zope/sqlalchemy/README.txt	2008-05-05 19:24:56 UTC (rev 86464)
@@ -0,0 +1,153 @@
+=================
+ zope.sqlalchemy
+=================
+
+The aim of this package is to unify the plethora of existing packages
+integrating SQLAlchemy with Zope's transaction management. As such it seeks
+only to provide a data manager and makes no attempt to define a `zopeish` way
+to configure engines.
+
+You need to understand SQLAlchemy for this package and this README to make 
+any sense. See http://sqlalchemy.org/docs/.
+
+
+Running the tests
+-----------------
+
+NOTE: Currently you must patch sqlalchemy with the included sqlalchemy.diff. Bug #1023
+
+This package is distributed as a buildout. Using your desired python run:
+
+$ python bootstrap.py
+
+This will download the dependent packages and setup the test script, which may
+be run with:
+
+$ ./bin/test
+
+To enable testing with your own database set the TEST_DSN environment variable
+to your sqlalchemy database dsn. Two-phase commit behaviour may be tested by
+setting the TEST_TWOPHASE variable to a non empty string. e.g:
+
+$ TEST_DSN=postgres://test:test@localhost/test TEST_TWOPHASE=True bin/test
+
+Example
+-------
+
+This example is lifted directly from the SQLAlchemy declarative documentation.
+First the necessary imports.
+
+	>>> from sqlalchemy import *
+	>>> from sqlalchemy.ext.declarative import declarative_base
+	>>> from sqlalchemy.orm import scoped_session, sessionmaker, relation
+	>>> from zope.sqlalchemy import ZopeTransactionExtension, invalidate
+	>>> import transaction
+
+Now to define the mapper classes.
+
+	>>> Base = declarative_base()
+	>>> class User(Base):
+	...     __tablename__ = 'test_users'
+	...     id = Column('id', Integer, primary_key=True)
+	...     name = Column('name', String(50))
+	...     addresses = relation("Address", backref="user")
+	>>> class Address(Base):
+	...     __tablename__ = 'test_addresses'
+	...     id = Column('id', Integer, primary_key=True)
+	...     email = Column('email', String(50))
+	...     user_id = Column('user_id', Integer, ForeignKey('test_users.id'))
+
+Create an engine and setup the tables. Note that for this example to work a 
+recent version of sqlite/pysqlite is required. 3.4.0 seems to be sufficient.
+
+	>>> engine = create_engine(TEST_DSN, convert_unicode=True)
+	>>> Base.metadata.create_all(engine)
+
+Now to create the session itself. As zope is a threaded web server we must use
+scoped sessions. Zope and SQLAlchemy sessions are tied together by using the
+ZopeTransactionExtension from this package.
+
+	>>> Session = scoped_session(sessionmaker(bind=engine, twophase=TEST_TWOPHASE,
+	... transactional=True, autoflush=True, extension=ZopeTransactionExtension()))
+
+Call the scoped session factory to retrieve a session. You may call this as
+many times as you like within a transaction and you will always retrieve the
+same session. At present there are no users in the database.
+
+	>>> session = Session()
+	>>> session.query(User).all()
+	[]
+
+We can now create a new user and commit the changes using Zope's transaction
+machinary, just as Zope's publisher would.
+
+	>>> session.save(User(name='bob'))
+	>>> transaction.commit()
+
+Engine level connections are outside the scope of the transaction integration.
+
+	>>> engine.connect().execute('SELECT * FROM test_users').fetchall()
+	[(1, ...'bob')]
+
+A new transaction requires a new session. Let's add an address.
+
+	>>> session = Session()
+	>>> bob = session.query(User).all()[0]
+	>>> bob.name
+	u'bob'
+	>>> bob.addresses
+	[]
+	>>> bob.addresses.append(Address(email='bob at bob.bob'))
+	>>> transaction.commit()
+	>>> session = Session()
+	>>> bob = session.query(User).all()[0]
+	>>> bob.addresses
+	[<Address object at ...>]
+	>>> bob.addresses[0].email
+	u'bob at bob.bob'
+	>>> bob.addresses[0].email = 'wrong at wrong'    
+
+To rollback a transaction, use transaction.abort().
+
+	>>> transaction.abort()
+	>>> session = Session()
+	>>> bob = session.query(User).all()[0]
+	>>> bob.addresses[0].email
+	u'bob at bob.bob'
+	>>> transaction.abort()
+
+By default, zope.sqlalchemy puts sessions in an 'active' state when they are
+first used. ORM write operations automatically move the session into an
+'invalidated' state. This avoids unnecessary database commits. Sometimes it
+is necessary to interact with the database directly through SQL. It is not
+possible to guess whether such an operation is a read or a write. Therefore we
+must manually mark the session as invalidated when manual SQL statements write
+to the DB.
+
+	>>> session = Session()
+	>>> conn = session.connection()
+	>>> users = Base.metadata.tables['test_users']
+	>>> conn.execute(users.update(users.c.name=='bob'), name='ben')
+	<sqlalchemy.engine.base.ResultProxy object at ...>
+	>>> from zope.sqlalchemy import invalidate
+	>>> invalidate(session)
+	>>> transaction.commit()
+	>>> session = Session()
+	>>> session.query(User).all()[0].name
+	u'ben'
+	>>> transaction.abort()
+
+If this is a problem you may tell the extension to place the session in the
+'invalidated' state initially.
+
+    >>> Session.configure(extension=ZopeTransactionExtension('invalidated'))
+    >>> Session.remove()
+	>>> session = Session()
+	>>> conn = session.connection()
+	>>> conn.execute(users.update(users.c.name=='ben'), name='bob')
+	<sqlalchemy.engine.base.ResultProxy object at ...>
+	>>> transaction.commit()
+	>>> session = Session()
+	>>> session.query(User).all()[0].name
+	u'bob'
+	>>> transaction.abort()

Added: zope.sqlalchemy/trunk/src/zope/sqlalchemy/__init__.py
===================================================================
--- zope.sqlalchemy/trunk/src/zope/sqlalchemy/__init__.py	                        (rev 0)
+++ zope.sqlalchemy/trunk/src/zope/sqlalchemy/__init__.py	2008-05-05 19:24:56 UTC (rev 86464)
@@ -0,0 +1,15 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+from datamanager import ZopeTransactionExtension, invalidate

Added: zope.sqlalchemy/trunk/src/zope/sqlalchemy/datamanager.py
===================================================================
--- zope.sqlalchemy/trunk/src/zope/sqlalchemy/datamanager.py	                        (rev 0)
+++ zope.sqlalchemy/trunk/src/zope/sqlalchemy/datamanager.py	2008-05-05 19:24:56 UTC (rev 86464)
@@ -0,0 +1,190 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+import transaction as zope_transaction
+from zope.interface import implements
+from transaction.interfaces import ISavepointDataManager, IDataManagerSavepoint
+from sqlalchemy.orm.session import SessionExtension
+from sqlalchemy.engine.base import Engine
+
+# The status of the session is stored on the connection info
+STATUS_ACTIVE = 'active' # session joined to transaction, writes allowed.
+STATUS_INVALIDATED = 'invalidated' # data has been written
+STATUS_READONLY = 'readonly' # session joined to transaction, no writes allowed.
+
+NO_SAVEPOINT_SUPPORT = set(['sqlite'])
+
+
+_SESSION_STATE = {} # a mapping of id(session) -> status
+# This is thread safe because you are using scoped sessions
+
+
+#
+# The two variants of the DataManager.
+#
+
+class SessionDataManager(object):
+    """Integrate a top level sqlalchemy session transaction into a zope transaction
+    
+    One phase variant.
+    """
+    
+    implements(ISavepointDataManager)
+
+    def __init__(self, session, status):
+        if session.transactional:
+            self.tx = session.transaction._iterate_parents()[-1]
+        else:
+            assert session.transaction is None
+            self.tx = session.begin()
+        self.session = session
+        _SESSION_STATE[id(session)] = status
+        self.state = 'init'
+
+    def abort(self, trans):
+        if self.tx is not None: # this could happen after a tpc_abort
+            del _SESSION_STATE[id(self.session)]
+            self.session.close()
+            self.tx = self.session = None
+            self.state = 'aborted'
+
+    def tpc_begin(self, trans):
+        self.session._autoflush()
+    
+    def commit(self, trans):
+        status = _SESSION_STATE[id(self.session)]
+        del _SESSION_STATE[id(self.session)]
+        if status is not STATUS_INVALIDATED:
+            self.session.close()
+            self.tx = self.session = None
+            self.state = 'no work'
+
+    def tpc_vote(self, trans):
+        # for a one phase data manager commit last in tpc_vote
+        if self.tx is not None: # there may have been no work to do
+            self.tx.commit()
+            self.session.close()
+            self.tx = self.session = None
+            self.state = 'finished on vote'
+                
+    def tpc_finish(self, trans):
+        pass
+
+    def tpc_abort(self, trans):
+        raise TypeError("Already committed")
+
+    def sortKey(self):
+        # Try to sort last, so that we vote last - we may commit in tpc_vote(),
+        # which allows Zope to roll back its transaction if the RDBMS 
+        # threw a conflict error.
+        return "~sqlalchemy:%d" % id(self.tx)
+    
+    @property
+    def savepoint(self):
+        """Savepoints are only supported when all connections support subtransactions
+        """
+        if set(engine.url.drivername
+            for engine in self.session.transaction._connections.keys()
+            if isinstance(engine, Engine)
+            ).intersection(NO_SAVEPOINT_SUPPORT):
+            raise AttributeError('savepoint')
+        return self._savepoint
+    
+    def _savepoint(self):
+        return SessionSavepoint(self.session)
+
+
+class TwoPhaseSessionDataManager(SessionDataManager):
+    """Two phase variant.
+    """
+    def tpc_vote(self, trans):
+        if self.tx is not None: # there may have been no work to do
+            self.tx.prepare()
+            self.state = 'voted'
+                
+    def tpc_finish(self, trans):
+        if self.tx is not None:
+            self.tx.commit()
+            self.session.close()
+            self.tx = self.session = None
+            self.state = 'finished'
+
+    def tpc_abort(self, trans):
+        if self.tx is not None: # we may not have voted, and been aborted already
+            self.tx._rollback_impl() # XXX change to self.tx.rollback() when #1024 fixed
+            self.session.close()
+            self.tx = self.session = None
+            self.state = 'aborted commit'
+
+    def sortKey(self):
+        # Sort normally
+        return "sqlalchemy.twophase:%d" % id(self.tx)
+
+
+class SessionSavepoint:
+    implements(IDataManagerSavepoint)
+
+    def __init__(self, session):
+        self.session = session
+        self.transaction = session.begin_nested()
+        session.flush() # do I want to do this? Probably.
+
+    def rollback(self):
+        # no need to check validity, sqlalchemy should raise an exception. I think.
+        self.transaction.rollback()
+        self.session.clear() # remove when Session.rollback does an attribute_manager.rollback
+
+
+def join_transaction(session, initial_state=STATUS_ACTIVE):
+    """Join a session to a transaction using the appropriate datamanager.
+       
+    It is safe to call this multiple times, if the session is already joined
+    then it just returns.
+       
+    `initial_state` is either STATUS_ACTIVE, STATUS_INVALIDATED or STATUS_READONLY
+    
+    If using the default initial status of STATUS_ACTIVE, you must ensure that
+    dirty_session(session) is called when data is written to the database.
+    
+    The DirtyAfterFlush SessionExtension can be used to ensure that this is
+    called automatically after session write operations.
+    """
+    if _SESSION_STATE.get(id(session), None) is None:
+        DataManager = session.twophase and TwoPhaseSessionDataManager or SessionDataManager
+        zope_transaction.get().join(DataManager(session, initial_state))
+
+def invalidate(session):
+    """Mark a session as needing to be committed
+    """
+    assert _SESSION_STATE[id(session)] is not STATUS_READONLY
+    _SESSION_STATE[id(session)] = STATUS_INVALIDATED
+
+
+class ZopeTransactionExtension(SessionExtension):
+    """Record that a flush has occurred on a session's connection. This allows
+    the DataManager to rollback rather than commit on read only transactions.
+    """
+    
+    def __init__(self, initial_state=STATUS_ACTIVE):
+        SessionExtension.__init__(self)
+        self.initial_state = initial_state
+    
+    def after_begin(self, session, transaction, connection):
+        join_transaction(session, self.initial_state)
+    
+    def after_flush(self, session, flush_context):
+        invalidate(session)
+    
+    def before_commit(self, session):
+        assert zope_transaction.get().status == 'Committing', "Transaction must be committed by zope"

Added: zope.sqlalchemy/trunk/src/zope/sqlalchemy/tests.py
===================================================================
--- zope.sqlalchemy/trunk/src/zope/sqlalchemy/tests.py	                        (rev 0)
+++ zope.sqlalchemy/trunk/src/zope/sqlalchemy/tests.py	2008-05-05 19:24:56 UTC (rev 86464)
@@ -0,0 +1,407 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+# Much inspiration from z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/tests/testSQLAlchemy.py
+#
+# You may want to run the tests with your database. To do so set the environment variable
+# TEST_DSN to the connection url. e.g.:
+# export TEST_DSN=postgres://plone:plone@localhost/test
+# export TEST_DSN=mssql://plone:plone@/test?dsn=mydsn
+#
+# To test in twophase commit mode export TEST_TWOPHASE=True 
+#
+# NOTE: The sqlite that ships with Mac OS X 10.4 is buggy. Install a newer version (3.5.6)
+#       and rebuild pysqlite2 against it.
+
+
+import os
+import unittest
+import transaction
+import threading
+import sqlalchemy as sa
+from sqlalchemy import orm, sql
+from zope.sqlalchemy import datamanager as tx
+
+TEST_TWOPHASE = bool(os.environ.get('TEST_TWOPHASE'))
+TEST_DSN = os.environ.get('TEST_DSN', 'sqlite:///:memory:')
+
+
+class SimpleModel(object):
+    def __init__(self, **kw):
+        for k, v in kw.items():
+            setattr(self, k, v)
+    def asDict(self):
+        return dict((k, v) for k, v in self.__dict__.items() if not k.startswith('_'))
+
+class User(SimpleModel): pass
+class Skill(SimpleModel): pass
+
+
+engine = sa.create_engine(TEST_DSN)
+engine2 = sa.create_engine(TEST_DSN)
+
+Session = orm.scoped_session(orm.sessionmaker(
+    bind=engine,
+    extension=tx.ZopeTransactionExtension(),
+    transactional=True,
+    autoflush=True,
+    twophase=TEST_TWOPHASE,
+    ))
+
+UnboundSession = orm.scoped_session(orm.sessionmaker(
+    extension=tx.ZopeTransactionExtension(),
+    transactional=True,
+    autoflush=True,
+    twophase=TEST_TWOPHASE,
+    ))
+
+metadata = sa.MetaData() # best to use unbound metadata
+
+test_users = sa.Table('test_users', metadata,
+    sa.Column('id', sa.Integer, primary_key=True),
+    sa.Column('firstname', sa.VARCHAR(255)), # mssql cannot do equality on a text type
+    sa.Column('lastname', sa.VARCHAR(255)),
+    )    
+test_skills = sa.Table('test_skills', metadata,
+    sa.Column('id', sa.Integer, primary_key=True),
+    sa.Column('user_id', sa.Integer),
+    sa.Column('name', sa.VARCHAR(255)),
+    sa.ForeignKeyConstraint(('user_id',), ('test_users.id',)),
+    )
+
+orm.mapper(User, test_users,
+    properties = {
+        'skills': orm.relation(Skill,
+            primaryjoin=test_users.columns['id']==test_skills.columns['user_id']),
+    })
+orm.mapper(Skill, test_skills)
+
+bound_metadata1 = sa.MetaData(engine)
+bound_metadata2 = sa.MetaData(engine2)
+
+test_one = sa.Table('test_one', bound_metadata1, sa.Column('id', sa.Integer, primary_key=True))
+test_two = sa.Table('test_two', bound_metadata2, sa.Column('id', sa.Integer, primary_key=True))
+
+class TestOne(SimpleModel): pass
+class TestTwo(SimpleModel): pass
+
+orm.mapper(TestOne, test_one)
+orm.mapper(TestTwo, test_two)
+
+
+class DummyException(Exception):
+    pass
+ 
+class DummyTargetRaised(DummyException):
+    pass  
+
+class DummyTargetResult(DummyException):
+    pass
+
+class DummyDataManager(object):
+    def __init__(self, key, target=None, args=(), kwargs={}):
+        self.key = key
+        self.target = target
+        self.args = args
+        self.kwargs = kwargs
+    
+    def abort(self, trans):
+        pass
+
+    def tpc_begin(self, trans):
+        pass
+    
+    def commit(self, trans):
+        pass
+
+    def tpc_vote(self, trans):
+        if self.target is not None:
+            try:
+                result = self.target(*self.args, **self.kwargs)
+            except Exception, e:
+                raise DummyTargetRaised(e)
+            raise DummyTargetResult(result)
+        else:
+            raise DummyException('DummyDataManager cannot commit')
+
+    def tpc_finish(self, trans):
+        pass
+
+    def tpc_abort(self, trans):
+        pass
+    
+    def sortKey(self):
+        return self.key
+
+
+class ZopeSQLAlchemyTests(unittest.TestCase):
+        
+    def setUp(self):
+        metadata.drop_all(engine)
+        metadata.create_all(engine)
+    
+    def tearDown(self):
+        transaction.abort()
+        metadata.drop_all(engine)
+
+    def testSimplePopulation(self):
+        session = Session()
+        query = session.query(User)
+        rows = query.all()
+        self.assertEqual(len(rows), 0)
+               
+        session.save(User(id=1, firstname='udo', lastname='juergens'))
+        session.save(User(id=2, firstname='heino', lastname='n/a'))
+        session.flush()
+        
+        rows = query.order_by(query.table.c.id).all()
+        self.assertEqual(len(rows), 2)
+        row1 = rows[0]
+        d = row1.asDict()
+        self.assertEqual(d, {'firstname' : 'udo', 'lastname' : 'juergens', 'id' : 1})
+        
+        # bypass the session machinary
+        stmt = sql.select(query.table.columns).order_by('id')
+        conn = session.connection()
+        results = conn.execute(stmt)
+        self.assertEqual(results.fetchall(), [(1, u'udo', u'juergens'), (2, u'heino', u'n/a')])
+        
+    def testRelations(self):
+        session = Session()
+        session.save(User(id=1,firstname='foo', lastname='bar'))
+
+        user = session.query(User).filter_by(firstname='foo')[0]
+        user.skills.append(Skill(id=1, name='Zope'))
+        session.flush()
+    
+    def testTransactionJoining(self):
+        transaction.abort() # clean slate
+        t = transaction.get()
+        self.failIf([r for r in t._resources if isinstance(r, tx.SessionDataManager)],
+             "Joined transaction too early")
+        conn = Session().connection()
+        self.failUnless([r for r in t._resources if isinstance(r, tx.SessionDataManager)],
+             "Not joined transaction")
+    
+    def testSavepoint(self):
+        use_savepoint = not engine.url.drivername in tx.NO_SAVEPOINT_SUPPORT
+        t = transaction.get()
+        session = Session()
+        query = session.query(User)
+        self.failIf(query.all(), "Users table should be empty")
+        
+        s0 = t.savepoint(optimistic=True) # this should always work
+        
+        if not use_savepoint:
+            self.assertRaises(TypeError, t.savepoint)
+            return # sqlite databases do not support savepoints
+        
+        s1 = t.savepoint()
+        session.save(User(id=1, firstname='udo', lastname='juergens'))
+        session.flush()
+        self.failUnless(len(query.all())==1, "Users table should have one row")
+        
+        s2 = t.savepoint()
+        session.save(User(id=2, firstname='heino', lastname='n/a'))
+        session.flush()
+        self.failUnless(len(query.all())==2, "Users table should have two rows")
+        
+        s2.rollback()
+        self.failUnless(len(query.all())==1, "Users table should have one row")
+        
+        s1.rollback()
+        self.failIf(query.all(), "Users table should be empty")
+    
+    def testCommit(self):
+        session = Session()
+        
+        use_savepoint = not engine.url.drivername in tx.NO_SAVEPOINT_SUPPORT
+        query = session.query(User)
+        rows = query.all()
+        self.assertEqual(len(rows), 0)
+        
+        transaction.commit() # test a none modifying transaction works
+        
+        session = Session()
+        query = session.query(User)
+
+        session.save(User(id=1, firstname='udo', lastname='juergens'))
+        session.save(User(id=2, firstname='heino', lastname='n/a'))
+        session.flush()
+
+        rows = query.order_by(query.table.c.id).all()
+        self.assertEqual(len(rows), 2)
+        
+        transaction.abort() # test that the abort really aborts
+        session = Session()
+        query = session.query(User)
+        rows = query.order_by(query.table.c.id).all()
+        self.assertEqual(len(rows), 0)
+        
+        session.save(User(id=1, firstname='udo', lastname='juergens'))
+        session.save(User(id=2, firstname='heino', lastname='n/a'))
+        session.flush()
+        rows = query.order_by(query.table.c.id).all()
+        row1 = rows[0]
+        d = row1.asDict()
+        self.assertEqual(d, {'firstname' : 'udo', 'lastname' : 'juergens', 'id' : 1})
+        
+        transaction.commit()
+
+        rows = query.order_by(query.table.c.id).all()
+        self.assertEqual(len(rows), 2)
+        row1 = rows[0]
+        d = row1.asDict()
+        self.assertEqual(d, {'firstname' : 'udo', 'lastname' : 'juergens', 'id' : 1})
+
+        # bypass the session (and transaction) machinary
+        results = engine.connect().execute(test_users.select())
+        self.assertEqual(len(results.fetchall()), 2)
+
+    def testCommitWithSavepoint(self):
+        if engine.url.drivername in tx.NO_SAVEPOINT_SUPPORT:
+            return
+        session = Session()
+        session.save(User(id=1, firstname='udo', lastname='juergens'))
+        session.save(User(id=2, firstname='heino', lastname='n/a'))
+        session.flush()
+        transaction.commit()
+        
+        session = Session()
+        query = session.query(User)
+        # lets just test that savepoints don't affect commits
+        t = transaction.get()
+        rows = query.order_by(query.table.c.id).all()
+
+        s1 = t.savepoint()
+        session.delete(rows[1])
+        session.flush()
+        transaction.commit()
+
+        # bypass the session machinary
+        results = engine.connect().execute(test_users.select())
+        self.assertEqual(len(results.fetchall()), 1)
+
+    def testTwoPhase(self):
+        session = Session()
+        if not session.twophase:
+            return
+        session.save(User(id=1, firstname='udo', lastname='juergens'))
+        session.save(User(id=2, firstname='heino', lastname='n/a'))
+        session.flush()
+        transaction.commit()
+        
+        # Test that we clean up after a tpc_abort
+        t = transaction.get()
+        
+        def target():
+            return engine.connect().recover_twophase()
+        
+        dummy = DummyDataManager(key='~~~dummy.last', target=target)
+        t.join(dummy)
+        session = Session()
+        query = session.query(User)
+        rows = query.all()
+        session.delete(rows[0])
+        session.flush()
+        result = None
+        try:
+            t.commit()
+        except DummyTargetResult, e:
+            result = e.args[0]
+        except DummyTargetRaised, e:
+            raise e.args[0]
+        
+        self.assertEqual(len(result), 1, "Should have been one prepared transaction when dummy aborted")
+        
+        transaction.begin()
+    
+        self.assertEqual(len(engine.connect().recover_twophase()), 0, "Test no outstanding prepared transactions")
+
+    
+    def testThread(self):
+        transaction.abort()
+        global thread_error
+        thread_error = None
+        def target():
+            try:
+                session = Session()
+                metadata.drop_all(engine)
+                metadata.create_all(engine)
+            
+                query = session.query(User)
+                rows = query.all()
+                self.assertEqual(len(rows), 0)
+
+                session.save(User(id=1, firstname='udo', lastname='juergens'))
+                session.save(User(id=2, firstname='heino', lastname='n/a'))
+                session.flush()
+
+                rows = query.order_by(query.table.c.id).all()
+                self.assertEqual(len(rows), 2)
+                row1 = rows[0]
+                d = row1.asDict()
+                self.assertEqual(d, {'firstname' : 'udo', 'lastname' : 'juergens', 'id' : 1})
+            except Exception, err:
+                global thread_error
+                thread_error = err
+            transaction.abort()
+        
+        thread = threading.Thread(target=target)
+        thread.start()
+        thread.join()
+        if thread_error is not None:
+            raise thread_error # reraise in current thread
+
+
+class MultipleEngineTests(unittest.TestCase):
+        
+    def setUp(self):
+        bound_metadata1.drop_all()
+        bound_metadata1.create_all()
+        bound_metadata2.drop_all()
+        bound_metadata2.create_all()
+    
+    def tearDown(self):
+        transaction.abort()
+        bound_metadata1.drop_all()
+        bound_metadata2.drop_all()
+
+    def testTwoEngines(self):
+        session = UnboundSession()
+        session.save(TestOne(id=1))
+        session.save(TestTwo(id=2))
+        session.flush()
+        transaction.commit()
+        session = UnboundSession()
+        rows = session.query(TestOne).all()
+        self.assertEqual(len(rows), 1)
+        rows = session.query(TestTwo).all()
+        self.assertEqual(len(rows), 1)
+
+def tearDownReadMe(test):
+    Base = test.globs['Base']
+    engine = test.globs['engine']
+    Base.metadata.drop_all(engine)
+
+def test_suite():
+    from unittest import TestSuite, makeSuite
+    import doctest
+    optionflags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS
+    suite = TestSuite()
+    for cls in (ZopeSQLAlchemyTests, MultipleEngineTests):
+        suite.addTest(makeSuite(cls))
+    suite.addTest(doctest.DocFileSuite('README.txt', optionflags=optionflags, tearDown=tearDownReadMe,
+        globs={'TEST_DSN': TEST_DSN, 'TEST_TWOPHASE': TEST_TWOPHASE}))
+    return suite



More information about the Checkins mailing list