[Checkins] SVN: relstorage/trunk/relstorage/ More refactoring: Expanded IConnectionManager and added IOIDAllocator

Shane Hathaway shane at hathawaymix.org
Wed Sep 23 19:14:01 EDT 2009


Log message for revision 104466:
  More refactoring: Expanded IConnectionManager and added IOIDAllocator
  

Changed:
  U   relstorage/trunk/relstorage/adapters/connmanager.py
  U   relstorage/trunk/relstorage/adapters/interfaces.py
  U   relstorage/trunk/relstorage/adapters/loadstore.py
  U   relstorage/trunk/relstorage/adapters/mysql.py
  A   relstorage/trunk/relstorage/adapters/oidallocator.py
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  U   relstorage/trunk/relstorage/relstorage.py

-=-
Modified: relstorage/trunk/relstorage/adapters/connmanager.py
===================================================================
--- relstorage/trunk/relstorage/adapters/connmanager.py	2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/connmanager.py	2009-09-23 23:14:01 UTC (rev 104466)
@@ -14,6 +14,7 @@
 
 from relstorage.adapters.interfaces import IConnectionManager
 from zope.interface import implements
+from ZODB.POSException import StorageError
 
 class AbstractConnectionManager(object):
     """Abstract base class for connection management.
@@ -22,13 +23,25 @@
     """
     implements(IConnectionManager)
 
+    # disconnected_exceptions contains the exception types that might be
+    # raised when the connection to the database has been broken.
+    disconnected_exceptions = ()
+
     # close_exceptions contains the exception types to ignore
     # when the adapter attempts to close a database connection.
     close_exceptions = ()
 
+    # on_store_opened is either None or a callable that
+    # will be called whenever a store cursor is opened or rolled back.
+    on_store_opened = None
+
+    def set_on_store_opened(self, f):
+        """Set the on_store_opened hook"""
+        self.on_store_opened = f
+
     def open(self):
         """Open a database connection and return (conn, cursor)."""
-        raise NotImplementedError
+        raise NotImplementedError()
 
     def close(self, conn, cursor):
         """Close a connection and cursor, ignoring certain errors.
@@ -61,3 +74,35 @@
         finally:
             self.close(conn, cursor)
 
+    def open_for_load(self):
+        raise NotImplementedError()
+
+    def restart_load(self, conn, cursor):
+        """Reinitialize a connection for loading objects."""
+        try:
+            conn.rollback()
+        except self.disconnected_exceptions, e:
+            raise StorageError(e)
+
+    def open_for_store(self):
+        """Open and initialize a connection for storing objects.
+
+        Returns (conn, cursor).
+        """
+        conn, cursor = self.open()
+        try:
+            if self.on_store_opened is not None:
+                self.on_store_opened(cursor, restart=False)
+            return conn, cursor
+        except:
+            self.close(conn, cursor)
+            raise
+
+    def restart_store(self, conn, cursor):
+        """Reuse a store connection."""
+        try:
+            conn.rollback()
+            if self.on_store_opened is not None:
+                self.on_store_opened(cursor, restart=True)
+        except self.disconnected_exceptions, e:
+            raise StorageError(e)

Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py	2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/interfaces.py	2009-09-23 23:14:01 UTC (rev 104466)
@@ -34,7 +34,31 @@
         then propagates the exception.
         """
 
+    def open_for_load():
+        """Open a connection for loading objects.
 
+        Returns (conn, cursor).
+        """
+
+    def restart_load(conn, cursor):
+        """Reinitialize a connection for loading objects.
+
+        Raise StorageError if the database has disconnected.
+        """
+
+    def open_for_store():
+        """Open and initialize a connection for storing objects.
+
+        Returns (conn, cursor).
+        """
+
+    def restart_store(conn, cursor):
+        """Rollback and reuse a store connection.
+
+        Raise StorageError if the database has disconnected.
+        """
+
+
 class IDatabaseIterator(Interface):
 
     def iter_objects(cursor, tid):
@@ -92,6 +116,15 @@
         """Release the pack lock."""
 
 
+class IOIDAllocator(Interface):
+
+    def set_min_oid(cursor, oid):
+        """Ensure the next OID is at least the given OID."""
+
+    def new_oid(cursor):
+        """Return a new, unused OID."""
+
+
 class IPackUndo(Interface):
 
     def verify_undoable(cursor, undo_tid):

Modified: relstorage/trunk/relstorage/adapters/loadstore.py
===================================================================
--- relstorage/trunk/relstorage/adapters/loadstore.py	2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/loadstore.py	2009-09-23 23:14:01 UTC (rev 104466)
@@ -35,47 +35,19 @@
 
 class HistoryPreservingPostgreSQLLoadStore(object):
 
-    def __init__(self, connmanager, disconnected_exceptions):
-        self.connmanager = connmanager
-        self.disconnected_exceptions = disconnected_exceptions
-
-    def open_for_load(self):
-        """Open and initialize a connection for loading objects.
-
-        Returns (conn, cursor).
-        """
-        conn, cursor = self.connmanager.open(
-            self.connmanager.isolation_serializable)
-        stmt = """
-        PREPARE get_latest_tid AS
-        SELECT tid
-        FROM transaction
-        ORDER BY tid DESC
-        LIMIT 1
-        """
-        cursor.execute(stmt)
-        return conn, cursor
-
-    def restart_load(self, cursor):
-        """Reinitialize a connection for loading objects."""
-        try:
-            cursor.connection.rollback()
-        except self.disconnected_exceptions, e:
-            raise StorageError(e)
-
     def get_current_tid(self, cursor, oid):
         """Returns the current integer tid for an object.
 
         oid is an integer.  Returns None if object does not exist.
         """
-        cursor.execute("""
+        stmt = """
         SELECT tid
         FROM current_object
         WHERE zoid = %s
-        """, (oid,))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()[0]
+        """
+        cursor.execute(stmt, (oid,))
+        for (tid,) in cursor:
+            return tid
         return None
 
     def load_current(self, cursor, oid):
@@ -121,8 +93,11 @@
 
     def exists(self, cursor, oid):
         """Returns a true value if the given object exists."""
-        cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
-        return cursor.rowcount
+        stmt = "SELECT 1 FROM current_object WHERE zoid = %s"
+        cursor.execute(stmt, (oid,))
+        for row in cursor:
+            return True
+        return False
 
     def load_before(self, cursor, oid, tid):
         """Returns the pickle and tid of an object before transaction tid.
@@ -169,7 +144,7 @@
         else:
             return None
 
-    def _make_temp_table(self, cursor):
+    def on_store_opened(self, cursor, restart=False):
         """Create the temporary table for storing objects"""
         stmt = """
         CREATE TEMPORARY TABLE temp_store (
@@ -182,27 +157,6 @@
         """
         cursor.execute(stmt)
 
-    def open_for_store(self):
-        """Open and initialize a connection for storing objects.
-
-        Returns (conn, cursor).
-        """
-        conn, cursor = self.connmanager.open()
-        try:
-            self._make_temp_table(cursor)
-            return conn, cursor
-        except:
-            self.connmanager.close(conn, cursor)
-            raise
-
-    def restart_store(self, cursor):
-        """Reuse a store connection."""
-        try:
-            cursor.connection.rollback()
-            self._make_temp_table(cursor)
-        except self.disconnected_exceptions, e:
-            raise StorageError(e)
-
     def store_temp(self, cursor, oid, prev_tid, data):
         """Store an object in the temporary table."""
         md5sum = compute_md5sum(data)
@@ -302,57 +256,25 @@
         )
         """, {'tid': tid})
 
-    def set_min_oid(self, cursor, oid):
-        """Ensure the next OID is at least the given OID."""
-        cursor.execute("""
-        SELECT CASE WHEN %s > nextval('zoid_seq')
-            THEN setval('zoid_seq', %s)
-            ELSE 0
-            END
-        """, (oid, oid))
 
-    def new_oid(self, cursor):
-        """Return a new, unused OID."""
-        stmt = "SELECT NEXTVAL('zoid_seq')"
-        cursor.execute(stmt)
-        return cursor.fetchone()[0]
-
-
 class HistoryPreservingMySQLLoadStore(object):
 
-    def __init__(self, connmanager, disconnected_exceptions, Binary):
-        self.connmanager = connmanager
-        self.disconnected_exceptions = disconnected_exceptions
+    def __init__(self, Binary):
         self.Binary = Binary
 
-    def open_for_load(self):
-        """Open and initialize a connection for loading objects.
-
-        Returns (conn, cursor).
-        """
-        return self.connmanager.open(
-            self.connmanager.isolation_repeatable_read)
-
-    def restart_load(self, cursor):
-        """Reinitialize a connection for loading objects."""
-        try:
-            cursor.connection.rollback()
-        except self.disconnected_exceptions, e:
-            raise StorageError(e)
-
     def get_current_tid(self, cursor, oid):
         """Returns the current integer tid for an object.
 
         oid is an integer.  Returns None if object does not exist.
         """
-        cursor.execute("""
+        stmt = """
         SELECT tid
         FROM current_object
         WHERE zoid = %s
-        """, (oid,))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()[0]
+        """
+        cursor.execute(stmt, (oid,))
+        for (tid,) in cursor:
+            return tid
         return None
 
     def load_current(self, cursor, oid):
@@ -391,8 +313,11 @@
 
     def exists(self, cursor, oid):
         """Returns a true value if the given object exists."""
-        cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
-        return cursor.rowcount
+        stmt = "SELECT 1 FROM current_object WHERE zoid = %s"
+        cursor.execute(stmt, (oid,))
+        for row in cursor:
+            return True
+        return False
 
     def load_before(self, cursor, oid, tid):
         """Returns the pickle and tid of an object before transaction tid.
@@ -433,8 +358,14 @@
         else:
             return None
 
-    def _make_temp_table(self, cursor):
+    def on_store_opened(self, cursor, restart=False):
         """Create the temporary table for storing objects"""
+        if restart:
+            stmt = """
+            DROP TEMPORARY TABLE IF EXISTS temp_store
+            """
+            cursor.execute(stmt)
+
         stmt = """
         CREATE TEMPORARY TABLE temp_store (
             zoid        BIGINT NOT NULL PRIMARY KEY,
@@ -445,35 +376,6 @@
         """
         cursor.execute(stmt)
 
-    def open_for_store(self):
-        """Open and initialize a connection for storing objects.
-
-        Returns (conn, cursor).
-        """
-        conn, cursor = self.connmanager.open()
-        try:
-            self._make_temp_table(cursor)
-            return conn, cursor
-        except:
-            self.connmanager.close(conn, cursor)
-            raise
-
-    def _restart_temp_table(self, cursor):
-        """Restart the temporary table for storing objects"""
-        stmt = """
-        DROP TEMPORARY TABLE IF EXISTS temp_store
-        """
-        cursor.execute(stmt)
-        self._make_temp_table(cursor)
-
-    def restart_store(self, cursor):
-        """Reuse a store connection."""
-        try:
-            cursor.connection.rollback()
-            self._restart_temp_table(cursor)
-        except self.disconnected_exceptions, e:
-            raise StorageError(e)
-
     def store_temp(self, cursor, oid, prev_tid, data):
         """Store an object in the temporary table."""
         md5sum = compute_md5sum(data)
@@ -561,59 +463,26 @@
         WHERE tid = %s
         """, (tid,))
 
-    def set_min_oid(self, cursor, oid):
-        """Ensure the next OID is at least the given OID."""
-        cursor.execute("REPLACE INTO new_oid VALUES(%s)", (oid,))
 
-    def new_oid(self, cursor):
-        """Return a new, unused OID."""
-        stmt = "INSERT INTO new_oid VALUES ()"
-        cursor.execute(stmt)
-        oid = cursor.connection.insert_id()
-        if oid % 100 == 0:
-            # Clean out previously generated OIDs.
-            stmt = "DELETE FROM new_oid WHERE zoid < %s"
-            cursor.execute(stmt, (oid,))
-        return oid
-
-
 class HistoryPreservingOracleLoadStore(object):
 
-    def __init__(self, connmanager, runner, disconnected_exceptions,
-            Binary, inputsize_BLOB, inputsize_BINARY, twophase):
-        self.connmanager = connmanager
+    def __init__(self, runner, Binary, inputsize_BLOB, inputsize_BINARY):
         self.runner = runner
-        self.disconnected_exceptions = disconnected_exceptions
         self.Binary = Binary
         self.inputsize_BLOB = inputsize_BLOB
         self.inputsize_BINARY = inputsize_BINARY
-        self.twophase = twophase
 
-    def open_for_load(self):
-        """Open and initialize a connection for loading objects.
-
-        Returns (conn, cursor).
-        """
-        return self.connmanager.open(self.connmanager.isolation_read_only)
-
-    def restart_load(self, cursor):
-        """Reinitialize a connection for loading objects."""
-        try:
-            cursor.connection.rollback()
-            cursor.execute("SET TRANSACTION READ ONLY")
-        except self.disconnected_exceptions, e:
-            raise StorageError(e)
-
     def get_current_tid(self, cursor, oid):
         """Returns the current integer tid for an object.
 
         oid is an integer.  Returns None if object does not exist.
         """
-        cursor.execute("""
+        stmt = """
         SELECT tid
         FROM current_object
         WHERE zoid = :1
-        """, (oid,))
+        """
+        cursor.execute(stmt, (oid,))
         for (tid,) in cursor:
             return tid
         return None
@@ -649,8 +518,11 @@
 
     def exists(self, cursor, oid):
         """Returns a true value if the given object exists."""
-        cursor.execute("SELECT 1 FROM current_object WHERE zoid = :1", (oid,))
-        return len(list(cursor))
+        stmt = "SELECT 1 FROM current_object WHERE zoid = :1"
+        cursor.execute(stmt, (oid,))
+        for row in cursor:
+            return True
+        return False
 
     def load_before(self, cursor, oid, tid):
         """Returns the pickle and tid of an object before transaction tid.
@@ -690,41 +562,8 @@
         else:
             return None
 
-    def _set_xid(self, cursor):
-        """Set up a distributed transaction"""
-        stmt = """
-        SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
-        """
-        cursor.execute(stmt)
-        xid = str(cursor.fetchone()[0])
-        cursor.connection.begin(0, xid, '0')
+    on_store_opened = None  # no store connection initialization needed
 
-    def open_for_store(self):
-        """Open and initialize a connection for storing objects.
-
-        Returns (conn, cursor).
-        """
-        if self.twophase:
-            conn, cursor = self.connmanager.open(
-                transaction_mode=None, twophase=True)
-            try:
-                self._set_xid(cursor)
-            except:
-                self.close(conn, cursor)
-                raise
-        else:
-            conn, cursor = self.connmanager.open()
-        return conn, cursor
-
-    def restart_store(self, cursor):
-        """Reuse a store connection."""
-        try:
-            cursor.connection.rollback()
-            if self.twophase:
-                self._set_xid(cursor)
-        except self.disconnected_exceptions, e:
-            raise StorageError(e)
-
     def store_temp(self, cursor, oid, prev_tid, data):
         """Store an object in the temporary table."""
         md5sum = compute_md5sum(data)
@@ -838,30 +677,3 @@
         """
         cursor.execute(stmt, (tid,))
 
-    def set_min_oid(self, cursor, oid):
-        """Ensure the next OID is at least the given OID."""
-        next_oid = self.new_oid(cursor)
-        if next_oid < oid:
-            # Oracle provides no way modify the sequence value
-            # except through alter sequence or drop/create sequence,
-            # but either statement kills the current transaction.
-            # Therefore, open a temporary connection to make the
-            # alteration.
-            conn2, cursor2 = self.connmanager.open()
-            try:
-                # Change the sequence by altering the increment.
-                # (this is safer than dropping and re-creating the sequence)
-                diff = oid - next_oid
-                cursor2.execute(
-                    "ALTER SEQUENCE zoid_seq INCREMENT BY %d" % diff)
-                cursor2.execute("SELECT zoid_seq.nextval FROM DUAL")
-                cursor2.execute("ALTER SEQUENCE zoid_seq INCREMENT BY 1")
-                conn2.commit()
-            finally:
-                self.connmanager.close(conn2, cursor2)
-
-    def new_oid(self, cursor):
-        """Return a new, unused OID."""
-        stmt = "SELECT zoid_seq.nextval FROM DUAL"
-        cursor.execute(stmt)
-        return cursor.fetchone()[0]

Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py	2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/mysql.py	2009-09-23 23:14:01 UTC (rev 104466)
@@ -55,6 +55,7 @@
 from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
 from relstorage.adapters.loadstore import HistoryPreservingMySQLLoadStore
 from relstorage.adapters.locker import MySQLLocker
+from relstorage.adapters.oidallocator import MySQLOIDAllocator
 from relstorage.adapters.packundo import HistoryPreservingPackUndo
 from relstorage.adapters.poller import Poller
 from relstorage.adapters.schema import HistoryPreservingMySQLSchema
@@ -68,7 +69,11 @@
 # raised when the connection to the database has been broken.
 disconnected_exceptions = (MySQLdb.OperationalError, MySQLdb.InterfaceError)
 
+# close_exceptions contains the exception types to ignore
+# when the adapter attempts to close a database connection.
+close_exceptions = disconnected_exceptions + (MySQLdb.ProgrammingError,)
 
+
 class MySQLAdapter(object):
     """MySQL adapter for RelStorage."""
 
@@ -83,10 +88,10 @@
             runner=self.runner,
             )
         self.loadstore = HistoryPreservingMySQLLoadStore(
-            connmanager=self.connmanager,
-            disconnected_exceptions=disconnected_exceptions,
             Binary=MySQLdb.Binary,
             )
+        self.oidallocator = MySQLOIDAllocator()
+        self.connmanager.set_on_store_opened(self.loadstore.on_store_opened)
         self.txncontrol = MySQLTransactionControl(
             Binary=MySQLdb.Binary,
             )
@@ -109,6 +114,10 @@
 
         self.open = self.connmanager.open
         self.close = self.connmanager.close
+        self.open_for_load = self.connmanager.open_for_load
+        self.restart_load = self.connmanager.restart_load
+        self.open_for_store = self.connmanager.open_for_store
+        self.restart_store = self.connmanager.restart_store
 
         self.hold_commit_lock = self.locker.hold_commit_lock
         self.release_commit_lock = self.locker.release_commit_lock
@@ -120,26 +129,22 @@
         self.zap_all = self.schema.zap_all
         self.drop_all = self.schema.drop_all
 
-        self.open_for_load = self.loadstore.open_for_load
-        self.restart_load = self.loadstore.restart_load
         self.get_current_tid = self.loadstore.get_current_tid
         self.load_current = self.loadstore.load_current
         self.load_revision = self.loadstore.load_revision
         self.exists = self.loadstore.exists
         self.load_before = self.loadstore.load_before
         self.get_object_tid_after = self.loadstore.get_object_tid_after
-
-        self.open_for_store = self.loadstore.open_for_store
-        self.restart_store = self.loadstore.restart_store
         self.store_temp = self.loadstore.store_temp
         self.replace_temp = self.loadstore.replace_temp
         self.restore = self.loadstore.restore
         self.detect_conflict = self.loadstore.detect_conflict
         self.move_from_temp = self.loadstore.move_from_temp
         self.update_current = self.loadstore.update_current
-        self.set_min_oid = self.loadstore.set_min_oid
-        self.new_oid = self.loadstore.new_oid
 
+        self.set_min_oid = self.oidallocator.set_min_oid
+        self.new_oid = self.oidallocator.new_oid
+
         self.get_tid_and_time = self.txncontrol.get_tid_and_time
         self.add_transaction = self.txncontrol.add_transaction
         self.commit_phase1 = self.txncontrol.commit_phase1
@@ -170,9 +175,8 @@
     isolation_read_committed = "ISOLATION LEVEL READ COMMITTED"
     isolation_repeatable_read = "ISOLATION LEVEL REPEATABLE READ"
 
-    # close_exceptions contains the exception types to ignore
-    # when the adapter attempts to close a database connection.
-    close_exceptions = disconnected_exceptions + (MySQLdb.ProgrammingError,)
+    disconnected_exceptions = disconnected_exceptions
+    close_exceptions = close_exceptions
 
     def __init__(self, params):
         self._params = params.copy()
@@ -192,3 +196,10 @@
             log.warning("Unable to connect: %s", e)
             raise
 
+    def open_for_load(self):
+        """Open and initialize a connection for loading objects.
+
+        Returns (conn, cursor).
+        """
+        return self.open(self.isolation_repeatable_read)
+

Added: relstorage/trunk/relstorage/adapters/oidallocator.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oidallocator.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/oidallocator.py	2009-09-23 23:14:01 UTC (rev 104466)
@@ -0,0 +1,90 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""IOIDAllocator implementations"""
+
+from relstorage.adapters.interfaces import IOIDAllocator
+from zope.interface import implements
+
+class PostgreSQLOIDAllocator(object):
+    implements(IOIDAllocator)
+
+    def set_min_oid(self, cursor, oid):
+        """Ensure the next OID is at least the given OID."""
+        cursor.execute("""
+        SELECT CASE WHEN %s > nextval('zoid_seq')
+            THEN setval('zoid_seq', %s)
+            ELSE 0
+            END
+        """, (oid, oid))
+
+    def new_oid(self, cursor):
+        """Return a new, unused OID."""
+        stmt = "SELECT NEXTVAL('zoid_seq')"
+        cursor.execute(stmt)
+        return cursor.fetchone()[0]
+
+
+class MySQLOIDAllocator(object):
+    implements(IOIDAllocator)
+
+    def set_min_oid(self, cursor, oid):
+        """Ensure the next OID is at least the given OID."""
+        cursor.execute("REPLACE INTO new_oid VALUES(%s)", (oid,))
+
+    def new_oid(self, cursor):
+        """Return a new, unused OID."""
+        stmt = "INSERT INTO new_oid VALUES ()"
+        cursor.execute(stmt)
+        oid = cursor.connection.insert_id()
+        if oid % 100 == 0:
+            # Clean out previously generated OIDs.
+            stmt = "DELETE FROM new_oid WHERE zoid < %s"
+            cursor.execute(stmt, (oid,))
+        return oid
+
+
+class OracleOIDAllocator(object):
+    implements(IOIDAllocator)
+
+    def __init__(self, connmanager):
+        self.connmanager = connmanager
+
+    def set_min_oid(self, cursor, oid):
+        """Ensure the next OID is at least the given OID."""
+        next_oid = self.new_oid(cursor)
+        if next_oid < oid:
+            # Oracle provides no way modify the sequence value
+            # except through alter sequence or drop/create sequence,
+            # but either statement kills the current transaction.
+            # Therefore, open a temporary connection to make the
+            # alteration.
+            conn2, cursor2 = self.connmanager.open()
+            try:
+                # Change the sequence by altering the increment.
+                # (this is safer than dropping and re-creating the sequence)
+                diff = oid - next_oid
+                cursor2.execute(
+                    "ALTER SEQUENCE zoid_seq INCREMENT BY %d" % diff)
+                cursor2.execute("SELECT zoid_seq.nextval FROM DUAL")
+                cursor2.execute("ALTER SEQUENCE zoid_seq INCREMENT BY 1")
+                conn2.commit()
+            finally:
+                self.connmanager.close(conn2, cursor2)
+
+    def new_oid(self, cursor):
+        """Return a new, unused OID."""
+        stmt = "SELECT zoid_seq.nextval FROM DUAL"
+        cursor.execute(stmt)
+        return cursor.fetchone()[0]
+

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2009-09-23 23:14:01 UTC (rev 104466)
@@ -15,11 +15,13 @@
 
 import logging
 import cx_Oracle
+from ZODB.POSException import StorageError
 
 from relstorage.adapters.connmanager import AbstractConnectionManager
 from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
 from relstorage.adapters.loadstore import HistoryPreservingOracleLoadStore
 from relstorage.adapters.locker import OracleLocker
+from relstorage.adapters.oidallocator import OracleOIDAllocator
 from relstorage.adapters.packundo import OracleHistoryPreservingPackUndo
 from relstorage.adapters.poller import Poller
 from relstorage.adapters.schema import HistoryPreservingOracleSchema
@@ -31,9 +33,15 @@
 
 # disconnected_exceptions contains the exception types that might be
 # raised when the connection to the database has been broken.
-disconnected_exceptions = (cx_Oracle.OperationalError,
-    cx_Oracle.InterfaceError, cx_Oracle.DatabaseError)
+disconnected_exceptions = (
+    cx_Oracle.OperationalError,
+    cx_Oracle.InterfaceError,
+    cx_Oracle.DatabaseError,
+    )
 
+# close_exceptions contains the exception types to ignore
+# when the adapter attempts to close a database connection.
+close_exceptions = disconnected_exceptions
 
 class OracleAdapter(object):
     """Oracle adapter for RelStorage."""
@@ -70,14 +78,16 @@
             runner=self.runner,
             )
         self.loadstore = HistoryPreservingOracleLoadStore(
-            connmanager=self.connmanager,
             runner=self.runner,
-            disconnected_exceptions=disconnected_exceptions,
             Binary=cx_Oracle.Binary,
             inputsize_BLOB=cx_Oracle.BLOB,
             inputsize_BINARY=cx_Oracle.BINARY,
             twophase=bool(twophase),
             )
+        self.oidallocator = OracleOIDAllocator(
+            connmanager=self.connmanager,
+            )
+        self.connmanager.set_on_store_opened(self.loadstore.on_store_opened)
         self.txncontrol = OracleTransactionControl(
             Binary=cx_Oracle.Binary,
             )
@@ -100,6 +110,10 @@
 
         self.open = self.connmanager.open
         self.close = self.connmanager.close
+        self.open_for_load = self.connmanager.open_for_load
+        self.restart_load = self.connmanager.restart_load
+        self.open_for_store = self.connmanager.open_for_store
+        self.restart_store = self.connmanager.restart_store
 
         self.hold_commit_lock = self.locker.hold_commit_lock
         self.release_commit_lock = self.locker.release_commit_lock
@@ -111,26 +125,22 @@
         self.zap_all = self.schema.zap_all
         self.drop_all = self.schema.drop_all
 
-        self.open_for_load = self.loadstore.open_for_load
-        self.restart_load = self.loadstore.restart_load
         self.get_current_tid = self.loadstore.get_current_tid
         self.load_current = self.loadstore.load_current
         self.load_revision = self.loadstore.load_revision
         self.exists = self.loadstore.exists
         self.load_before = self.loadstore.load_before
         self.get_object_tid_after = self.loadstore.get_object_tid_after
-
-        self.open_for_store = self.loadstore.open_for_store
-        self.restart_store = self.loadstore.restart_store
         self.store_temp = self.loadstore.store_temp
         self.replace_temp = self.loadstore.replace_temp
         self.restore = self.loadstore.restore
         self.detect_conflict = self.loadstore.detect_conflict
         self.move_from_temp = self.loadstore.move_from_temp
         self.update_current = self.loadstore.update_current
-        self.set_min_oid = self.loadstore.set_min_oid
-        self.new_oid = self.loadstore.new_oid
 
+        self.set_min_oid = self.oidallocator.set_min_oid
+        self.new_oid = self.oidallocator.new_oid
+
         self.get_tid_and_time = self.txncontrol.get_tid_and_time
         self.add_transaction = self.txncontrol.add_transaction
         self.commit_phase1 = self.txncontrol.commit_phase1
@@ -222,11 +232,13 @@
     isolation_read_committed = "ISOLATION LEVEL READ COMMITTED"
     isolation_read_only = "READ ONLY"
 
-    close_exceptions = disconnected_exceptions
+    disconnected_exceptions = disconnected_exceptions
+    close_exceptions = close_exceptions
 
-    def __init__(self, params, arraysize):
+    def __init__(self, params, arraysize, twophase):
         self._params = params
         self._arraysize = arraysize
+        self._twophase = twophase
 
     def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
             twophase=False):
@@ -243,3 +255,58 @@
         except cx_Oracle.OperationalError, e:
             log.warning("Unable to connect: %s", e)
             raise
+
+    def open_for_load(self):
+        """Open and initialize a connection for loading objects.
+
+        Returns (conn, cursor).
+        """
+        return self.open(self.isolation_read_only)
+
+    def restart_load(self, cursor):
+        """Reinitialize a connection for loading objects."""
+        try:
+            cursor.connection.rollback()
+            cursor.execute("SET TRANSACTION READ ONLY")
+        except self.disconnected_exceptions, e:
+            raise StorageError(e)
+
+    def _set_xid(self, conn, cursor):
+        """Set up a distributed transaction"""
+        stmt = """
+        SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
+        """
+        cursor.execute(stmt)
+        xid = str(cursor.fetchone()[0])
+        conn.begin(0, xid, '0')
+
+    def open_for_store(self):
+        """Open and initialize a connection for storing objects.
+
+        Returns (conn, cursor).
+        """
+        try:
+            if self._twophase:
+                conn, cursor = self.open(transaction_mode=None, twophase=True)
+                try:
+                    self._set_xid(conn, cursor)
+            else:
+                conn, cursor = self.open()
+            if self.on_store_opened is not None:
+                self.on_store_opened(cursor, restart=False)
+            return conn, cursor
+        except:
+            self.close(conn, cursor)
+            raise
+
+    def restart_store(self, conn, cursor):
+        """Reuse a store connection."""
+        try:
+            conn.rollback()
+            if self._twophase:
+                self._set_xid(conn, cursor)
+            if self.on_store_opened is not None:
+                self.on_store_opened(cursor, restart=True)
+        except self.disconnected_exceptions, e:
+            raise StorageError(e)
+

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2009-09-23 23:14:01 UTC (rev 104466)
@@ -21,6 +21,7 @@
 from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
 from relstorage.adapters.loadstore import HistoryPreservingPostgreSQLLoadStore
 from relstorage.adapters.locker import PostgreSQLLocker
+from relstorage.adapters.oidallocator import PostgreSQLOIDAllocator
 from relstorage.adapters.packundo import HistoryPreservingPackUndo
 from relstorage.adapters.poller import Poller
 from relstorage.adapters.schema import HistoryPreservingPostgreSQLSchema
@@ -32,8 +33,14 @@
 
 # disconnected_exceptions contains the exception types that might be
 # raised when the connection to the database has been broken.
-disconnected_exceptions = (psycopg2.OperationalError, psycopg2.InterfaceError)
+disconnected_exceptions = (
+    psycopg2.OperationalError,
+    psycopg2.InterfaceError,
+    )
 
+# close_exceptions contains the exception types to ignore
+# when the adapter attempts to close a database connection.
+close_exceptions = disconnected_exceptions
 
 class PostgreSQLAdapter(object):
     """PostgreSQL adapter for RelStorage."""
@@ -48,10 +55,9 @@
             locker=self.locker,
             connmanager=self.connmanager,
             )
-        self.loadstore = HistoryPreservingPostgreSQLLoadStore(
-            connmanager=self.connmanager,
-            disconnected_exceptions=disconnected_exceptions,
-            )
+        self.loadstore = HistoryPreservingPostgreSQLLoadStore()
+        self.oidallocator = PostgreSQLOIDAllocator()
+        self.connmanager.set_on_store_opened(self.loadstore.on_store_opened)
         self.txncontrol = PostgreSQLTransactionControl()
         self.poller = Poller(
             poll_query="EXECUTE get_latest_tid",
@@ -72,6 +78,10 @@
 
         self.open = self.connmanager.open
         self.close = self.connmanager.close
+        self.open_for_load = self.connmanager.open_for_load
+        self.restart_load = self.connmanager.restart_load
+        self.open_for_store = self.connmanager.open_for_store
+        self.restart_store = self.connmanager.restart_store
 
         self.hold_commit_lock = self.locker.hold_commit_lock
         self.release_commit_lock = self.locker.release_commit_lock
@@ -83,26 +93,22 @@
         self.zap_all = self.schema.zap_all
         self.drop_all = self.schema.drop_all
 
-        self.open_for_load = self.loadstore.open_for_load
-        self.restart_load = self.loadstore.restart_load
         self.get_current_tid = self.loadstore.get_current_tid
         self.load_current = self.loadstore.load_current
         self.load_revision = self.loadstore.load_revision
         self.exists = self.loadstore.exists
         self.load_before = self.loadstore.load_before
         self.get_object_tid_after = self.loadstore.get_object_tid_after
-
-        self.open_for_store = self.loadstore.open_for_store
-        self.restart_store = self.loadstore.restart_store
         self.store_temp = self.loadstore.store_temp
         self.replace_temp = self.loadstore.replace_temp
         self.restore = self.loadstore.restore
         self.detect_conflict = self.loadstore.detect_conflict
         self.move_from_temp = self.loadstore.move_from_temp
         self.update_current = self.loadstore.update_current
-        self.set_min_oid = self.loadstore.set_min_oid
-        self.new_oid = self.loadstore.new_oid
 
+        self.set_min_oid = self.oidallocator.set_min_oid
+        self.new_oid = self.oidallocator.new_oid
+
         self.get_tid_and_time = self.txncontrol.get_tid_and_time
         self.add_transaction = self.txncontrol.add_transaction
         self.commit_phase1 = self.txncontrol.commit_phase1
@@ -135,7 +141,8 @@
     isolation_serializable = (
         psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
 
-    close_exceptions = disconnected_exceptions
+    disconnected_exceptions = disconnected_exceptions
+    close_exceptions = close_exceptions
 
     def __init__(self, dsn):
         self._dsn = dsn
@@ -153,3 +160,19 @@
             raise
         return conn, cursor
 
+    def open_for_load(self):
+        """Open and initialize a connection for loading objects.
+
+        Returns (conn, cursor).
+        """
+        conn, cursor = self.open(self.isolation_serializable)
+        stmt = """
+        PREPARE get_latest_tid AS
+        SELECT tid
+        FROM transaction
+        ORDER BY tid DESC
+        LIMIT 1
+        """
+        cursor.execute(stmt)
+        return conn, cursor
+

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/relstorage.py	2009-09-23 23:14:01 UTC (rev 104466)
@@ -195,7 +195,8 @@
             self._open_load_connection()
         else:
             try:
-                self._adapter.restart_load(self._load_cursor)
+                self._adapter.restart_load(
+                    self._load_conn, self._load_cursor)
             except POSException.StorageError, e:
                 log.warning("Reconnecting load_conn: %s", e)
                 self._drop_load_connection()
@@ -227,7 +228,8 @@
             self._open_store_connection()
         else:
             try:
-                self._adapter.restart_store(self._store_cursor)
+                self._adapter.restart_store(
+                    self._store_conn, self._store_cursor)
             except POSException.StorageError, e:
                 log.warning("Reconnecting store_conn: %s", e)
                 self._drop_store_connection()



More information about the checkins mailing list