[Checkins] SVN: z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/ - major overhoul of the Zope transaction integration: now using

Andreas Jung andreas at andreas-jung.com
Fri Sep 28 01:22:57 EDT 2007


Log message for revision 80268:
    - major overhoul of the Zope transaction integration: now using
      one DataManager for the session object and the connection. The 
      connection as returned through the 'connection' property is also
      used for creating a new 'session'. Older z3c.sqlalchemy version
      used separate connections. This allows applications to use both
      a session and a connection within the same Zope request/thread
      without running into transaction problems. SQL actions and
      session related modifications should happen within the same
      transaction.
  

Changed:
  U   z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/CHANGES.txt
  U   z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/base.py

-=-
Modified: z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/CHANGES.txt
===================================================================
--- z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/CHANGES.txt	2007-09-28 03:51:14 UTC (rev 80267)
+++ z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/CHANGES.txt	2007-09-28 05:22:56 UTC (rev 80268)
@@ -9,6 +9,16 @@
 
   - the unittests support an optional $TEST_DSN environment in order
     to run the test against an existing database (other than SQLite)
+               
+  - major overhoul of the Zope transaction integration: now using
+    one DataManager for the session object and the connection. The 
+    connection as returned through the 'connection' property is also
+    used for creating a new 'session'. Older z3c.sqlalchemy version
+    used separate connections. This allows applications to use both
+    a session and a connection within the same Zope request/thread
+    without running into transaction problems. SQL actions and
+    session related modifications should happen within the same
+    transaction.
 
  
 1.0.11 (30.07.2007)

Modified: z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/base.py
===================================================================
--- z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/base.py	2007-09-28 03:51:14 UTC (rev 80267)
+++ z3c.sqlalchemy/trunk/src/z3c/sqlalchemy/base.py	2007-09-28 05:22:56 UTC (rev 80268)
@@ -11,6 +11,7 @@
 
 import sqlalchemy
 from sqlalchemy.engine.url import make_url
+from sqlalchemy.orm import sessionmaker
 
 from zope.interface import implements
 from zope.component import getUtility
@@ -30,21 +31,25 @@
         self.lock = threading.Lock()
         self.cache = threading.local()
 
-
-    def set(self, **kw):
+    def set(self, id, d):
         self.lock.acquire()
-        for k,v in kw.items():
-            setattr(self.cache, k, v)
+        setattr(self.cache, id, d)
         self.lock.release()
 
-
-    def get(self, *names):
+    def get(self, id):
         self.lock.acquire()
-        result = [getattr(self.cache, name, None) for name in names]
+        result = getattr(self.cache, id, None)
         self.lock.release()
         return result
 
+    def remove(self, id):
+        self.lock.acquire()
+        if hasattr(self.cache, id):
+            delattr(self.cache, id)           
+        self.lock.release()
 
+
+
 class BaseWrapper(object):
 
     implements(ISQLAlchemyWrapper)
@@ -73,7 +78,7 @@
         self.echo = kw.get('echo', False)
         self._model = None
         self._createEngine()
-        self._id = random.random() # used as unique key for session/connection cache
+        self._id = str(random.random()) # used as unique key for session/connection cache
 
         if model:
 
@@ -142,7 +147,6 @@
                                                          transactional=True)
 
 
-session_cache = SynchronizedThreadCache()
 connection_cache = SynchronizedThreadCache()
 
 
@@ -151,25 +155,32 @@
 
     implements(IDataManager)
 
-    def __init__(self, session, id):
+    def __init__(self, connection, session, id, transactional=True):
+
+        self.connection = connection
         self.session = session
+        self.transactional = True
         self._id = id
         self.transaction = None
+        if self.transactional:
+            self.transaction = connection.begin()
 
     def abort(self, trans):
+
         if self.transaction is not None:
             self.transaction.rollback()
         self.session.clear()
-        session_cache.set(**{'last_session_%s' % self._id : None})
+        connection_cache.remove(self._id)
 
     def _flush(self):
+
         # check if the session contains something flushable
         if self.session.new or self.session.deleted or self.session.dirty:
 
             # Check if a session-bound transaction has been created so far.
             # If not, create a new transaction
-            if self.transaction is None:
-                self.transaction = self.session.create_transaction()
+#            if self.transaction is None:
+#                self.transaction = connection.begin()
 
             # Flush
             self.session.flush()
@@ -184,112 +195,70 @@
         self._flush()
 
     def tpc_finish(self, trans):
+
         if self.transaction is not None:
             self.transaction.commit()
+
         self.session.clear()
-        session_cache.set(**{'last_session_%s' % self._id : None})
+        self._cleanup()
+        
 
-
     def tpc_abort(self, trans):
         if self.transaction is not None:
             self.transaction.rollback()
-        self.session.clear()
-        session_cache.set(**{'last_session_%s' % self._id : None})
+        self._cleanup()
 
     def sortKey(self):
-        return 'z3c.sqlalchemy' + str(id(self))
+        return 'z3c.sqlalchemy_' + str(id(self))
 
+    def _cleanup(self):
+        self.session.clear()
+        if self.connection:
+            self.connection.close()
+            self.connection = None
+        connection_cache.remove(self._id)
 
-class ConnectionDataManager(object):
-    """ Wraps connection into transaction context of Zope """
 
-    implements(IDataManager)
 
-    def __init__(self, connection, transactional=True):
-        self.connection = connection
-        self.transactional = transactional
-        self.transaction = connection.begin()
-
-    def abort(self, trans):
-        if self.transactional:
-            self.transaction.rollback()
-        self.connection.close()
-        self.connection = None
-        connection_cache.set(last_connection=None)
-
-    def commit(self, trans):
-        if self.transactional:
-            self.transaction.commit()
-        self.connection.close()
-        self.connection = None
-        connection_cache.set(last_connection=None)
-
-    def tpc_begin(self, trans):
-        pass
-
-    def tpc_vote(self, trans):
-        pass
-
-    def tpc_finish(self, trans):
-        pass
-
-    def tpc_abort(self, trans):
-        pass
-
-    def sortKey(self):
-        return 'z3c.sqlalchemy' + str(id(self))
-
-
 class ZopeBaseWrapper(BaseWrapper):
     """ A wrapper to be used from within Zope. It connects
         the session with the transaction management of Zope.
     """
 
-    @property
-    def session(self):
 
-        last_session, = session_cache.get('last_session_%s' % self._id)
+    def __getOrCreateConnectionCacheItem(self, cache_id):
 
+        cache_item = connection_cache.get(cache_id)
+
         # return cached session if we are within the same transaction
         # and same thread
-        if last_session is not None:
-            return last_session
+        if cache_item is not None:
+            return cache_item
 
         # no cached session, let's create a new one
-        session = self._sessionmaker()
+        connection = self.engine.connect()
+        session = sessionmaker(connection)()
                                           
         # register a DataManager with the current transaction
-        transaction.get().join(SessionDataManager(session, self._id))
+        transaction.get().join(SessionDataManager(connection, session, self._id))
 
         # update thread-local cache
-        session_cache.set(**{'last_session_%s' % self._id : session})
+        cache_item = dict(connection=connection, session=session)
+        connection_cache.set(self._id, cache_item)
+        return cache_item
 
-        # return the session
-        return session 
 
     @property
+    def session(self):
+        """ Return a (cached) session object for the current transaction """
+        return self.__getOrCreateConnectionCacheItem(self._id)['session']
+
+
+    @property
     def connection(self):
         """ This property is _private_ and only intented to be used
             by SQLAlchemyDA and therefore it is not part of the 
             public API. 
         """
 
-        last_connection, = connection_cache.get('last_connection')
-
-        # return cached connection if we are within the same transaction
-        # and same thread
-        if last_connection is not None:
-            return last_connection
-
-        # no cached connection, let's create a new one
-        connection = self.engine.connect()
-                                          
-        # register a DataManager with the current transaction
-        transaction.get().join(ConnectionDataManager(connection, self.transactional))
-
-        # update thread-local cache
-        connection_cache.set(last_connection=connection)
-
-        # return the connection
-        return connection
-
+        return self.__getOrCreateConnectionCacheItem(self._id)['connection']



More information about the Checkins mailing list