[Checkins]
SVN: z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/base.py.debug
containing debug code
Andreas Jung
andreas at andreas-jung.com
Tue May 1 05:01:35 EDT 2007
Log message for revision 74955:
containing debug code
Changed:
A z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/base.py.debug
-=-
Added: z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/base.py.debug
===================================================================
--- z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/base.py.debug 2007-05-01 07:32:56 UTC (rev 74954)
+++ z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/base.py.debug 2007-05-01 09:01:30 UTC (rev 74955)
@@ -0,0 +1,261 @@
+##########################################################################
+# z3c.sqlalchemy - A SQLAlchemy wrapper for Python/Zope
+#
+# (C) Zope Corporation and Contributor
+# Written by Andreas Jung for Haufe Mediengruppe, Freiburg, Germany
+# and ZOPYX Ltd. & Co. KG, Tuebingen, Germany
+##########################################################################
+
+import threading
+
+import sqlalchemy
+from sqlalchemy.engine.url import make_url
+
+from zope.interface import implements
+from zope.component import getUtility
+from zope.component.interfaces import ComponentLookupError
+
+from z3c.sqlalchemy.interfaces import ISQLAlchemyWrapper, IModelProvider
+from z3c.sqlalchemy.model import Model
+from z3c.sqlalchemy.mapper import LazyMapperCollection
+
+import transaction
+from transaction.interfaces import IDataManager
+
+def log(s):
+ print '*** %s - %s' % (threading.currentThread(), s)
+
+
+class BaseWrapper(object):
+
+ implements(ISQLAlchemyWrapper)
+
+ def __init__(self, dsn, model=None, **kw):
+ """ 'dsn' - a RFC-1738-style connection string
+
+ 'model' - optional instance of model.Model
+
+ 'kw' - optional keyword arguments passed to create_engine()
+ """
+
+ self.dsn = dsn
+ self.url = make_url(dsn)
+ self.host = self.url.host
+ self.port = self.url.port
+ self.username = self.url.username
+ self.password = self.url.password
+ self.dbname = self.url.database
+ self.drivername = self.url.drivername
+ self.kw = kw
+ self.echo = kw.get('echo', False)
+ self._engine = self._createEngine()
+ self._engine.echo = self.echo
+ self._model = None
+
+
+ if model:
+
+ if isinstance(model, Model):
+ self._model = model
+
+ elif isinstance(model, basestring):
+
+ try:
+ util = getUtility(IModelProvider, model)
+ except ComponentLookupError:
+ raise ComponentLookupError("No named utility '%s' providing IModelProvider found" % model)
+
+
+ self._model = util.getModel(self.metadata)
+
+ elif callable(model):
+
+ self._model = model(self.metadata)
+
+
+ else:
+ raise ValueError("The 'model' parameter passed to constructor must either be "\
+ "the name of a named utility implementing IModelProvider or "\
+ "an instance of z3c.sqlalchemy.model.Model.")
+
+ if not isinstance(self._model, Model):
+ raise TypeError('_model is not an instance of model.Model')
+
+
+ # mappers must be initialized at last since we need to acces
+ # the 'model' from within the constructor of LazyMapperCollection
+ self._mappers = LazyMapperCollection(self)
+
+ @property
+ def metadata(self):
+ return sqlalchemy.BoundMetaData(self._engine)
+
+ @property
+ def session(self):
+ return sqlalchemy.create_session(self._engine)
+
+ def registerMapper(self, mapper, name):
+ self._mappers.registerMapper(mapper, name)
+
+ def getMapper(self, tablename, schema='public'):
+ return self._mappers.getMapper(tablename, schema)
+
+ def getMappers(self, *names):
+ return tuple([self.getMapper(name) for name in names])
+
+ @property
+ def engine(self):
+ """ only for private purposes! """
+ return self._engine
+
+ @property
+ def model(self):
+ """ only for private purposes! """
+ return self._model
+
+ def _createEngine(self):
+ return sqlalchemy.create_engine(self.dsn, **self.kw)
+
+_session_cache = threading.local() # module-level cache
+_connection_cache = threading.local() # module-level cache
+
+
+class SessionDataManager(object):
+ """ Wraps session into transaction context of Zope """
+
+ implements(IDataManager)
+
+ def __init__(self, session):
+ self.session = session
+ self.transaction = session.create_transaction()
+
+ def tpc_begin(self, trans):
+ pass
+
+ def abort(self, trans):
+ self.transaction.rollback()
+
+ def commit(self, trans):
+ self.session.flush()
+ self.transaction.commit()
+
+ def tpc_vote(self, trans):
+ pass
+
+ def tpc_finish(self, trans):
+ pass
+
+ def tpc_abort(self, trans):
+ pass
+
+ def sortKey(self):
+ return str(id(self))
+
+
+class ConnectionDataManager(object):
+ """ Wraps connection into transaction context of Zope """
+
+ implements(IDataManager)
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.transaction = connection.begin()
+
+ def tpc_begin(self, trans):
+ log('tpc_begin() - %s' % trans)
+ pass
+
+ def abort(self, trans):
+ self.transaction.rollback()
+ self.connection.close()
+ self.connection = None
+ log('abort() - %s' % trans)
+
+ def commit(self, trans):
+ self.transaction.commit()
+ log('commit() - %s' % trans)
+ self.connection.close()
+ self.connection = None
+
+ def tpc_vote(self, trans):
+ pass
+
+ def tpc_finish(self, trans):
+ log('tcp_finish() - %s' % trans)
+ pass
+
+ def tpc_abort(self, trans):
+ log('tcp_abort() - %s' % trans)
+ pass
+
+ def sortKey(self):
+ return str(id(self))
+
+
+class ZopeBaseWrapper(BaseWrapper):
+ """ A wrapper to be used from within Zope. It connects
+ the session with the transaction management of Zope.
+ """
+
+ @property
+ def session(self):
+
+ if not hasattr(_session_cache, 'last_transaction'):
+ _session_cache.last_transaction = None
+ _session_cache.last_session = None
+
+ # get current transaction
+ txn = transaction.get()
+ txn_str = str(txn)
+
+ # return cached session if we are within the same transaction
+ # and same thread
+ if txn_str == _session_cache.last_transaction:
+ return _session_cache.last_session
+
+ # no cached session, let's create a new one
+ session = sqlalchemy.create_session(self._engine)
+
+ # register a DataManager with the current transaction
+ txn.join(SessionDataManager(session))
+
+ # update thread-local cache
+ _session_cache.last_transaction = txn_str
+ _session_cache.last_session = session
+
+ # return the session
+ return session
+
+ @property
+ def connection(self):
+
+ if not hasattr(_connection_cache, 'last_connection'):
+ _connection_cache.last_transaction = None
+ _connection_cache.last_connection = None
+
+ # get current transaction
+ txn = transaction.get()
+ txn_str = str(txn)
+ log('current thread - %s' % threading.currentThread())
+ log('checking for transaction - %s' % txn_str)
+
+ # return cached connection if we are within the same transaction
+ # and same thread
+ if txn_str == _connection_cache.last_transaction:
+ log('returning cached connection - %s' % _connection_cache.last_connection)
+ return _connection_cache.last_connection
+
+ # no cached connection, let's create a new one
+ connection = self.engine.connect()
+ log('creating new connection - %s' % connection)
+
+ # register a DataManager with the current transaction
+ txn.join(ConnectionDataManager(connection))
+
+ # update thread-local cache
+ _connection_cache.last_transaction = txn_str
+ _connection_cache.last_connection = connection
+
+ # return the connection
+ return connection
+
More information about the Checkins
mailing list