[Checkins] SVN: relstorage/trunk/relstorage/ Leave store connections open after commit. This makes Oracle much faster and probably also helps PostgreSQL.

Shane Hathaway shane at hathawaymix.org
Tue Feb 5 04:30:30 EST 2008


Log message for revision 83524:
  Leave store connections open after commit.  This makes Oracle much faster and probably also helps PostgreSQL.
  Also corrected a theoretical source of corruption discovered by Dieter Maurer.
  At commit, we now call 'LOCK TABLE current_object IN SHARE MODE' before detecting conflicts.
  

Changed:
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  U   relstorage/trunk/relstorage/relstorage.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2008-02-05 07:08:27 UTC (rev 83523)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2008-02-05 09:30:29 UTC (rev 83524)
@@ -353,6 +353,15 @@
         else:
             return None
 
+    def _set_xid(self, cursor):
+        """Set up a distributed transaction"""
+        stmt = """
+        SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
+        """
+        cursor.execute(stmt)
+        xid = str(cursor.fetchone()[0])
+        cursor.connection.begin(0, xid, '0')
+
     def open_for_store(self):
         """Open and initialize a connection for storing objects.
 
@@ -361,12 +370,7 @@
         if self._twophase:
             conn, cursor = self.open(transaction_mode=None, twophase=True)
             try:
-                stmt = """
-                SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
-                """
-                cursor.execute(stmt)
-                xid = str(cursor.fetchone()[0])
-                conn.begin(0, xid, '0')
+                self._set_xid(cursor)
             except:
                 self.close(conn, cursor)
                 raise
@@ -374,6 +378,15 @@
             conn, cursor = self.open()
         return conn, cursor
 
+    def restart_store(self, cursor):
+        """Reuse a store connection."""
+        try:
+            cursor.connection.rollback()
+            if self._twophase:
+                self._set_xid(cursor)
+        except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
+            raise StorageError("database disconnected")
+
     def store_temp(self, cursor, oid, prev_tid, md5sum, data):
         """Store an object in the temporary table."""
         cursor.setinputsizes(data=cx_Oracle.BLOB)
@@ -399,13 +412,17 @@
 
     def start_commit(self, cursor):
         """Prepare to commit."""
+        # Hold commit_lock to prevent concurrent commits
+        # (for as short a time as possible).
+        # Lock current_object in share mode to ensure conflict
+        # detection has the most current data.
+        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+        cursor.execute("LOCK TABLE current_object IN SHARE MODE")
         cursor.execute("SAVEPOINT start_commit")
-        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
 
     def restart_commit(self, cursor):
         """Rollback the attempt to commit and start over."""
         cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
-        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
 
     def _parse_dsinterval(self, s):
         """Convert an Oracle dsinterval (as a string) to a float."""
@@ -542,7 +559,7 @@
         FROM transaction
         WHERE packed = 'N'
             AND tid != 0
-        ORDER BY tid desc
+        ORDER BY tid DESC
         """
         cursor.execute(stmt)
         return iter(cursor)
@@ -568,7 +585,7 @@
             JOIN object_state USING (tid)
         WHERE zoid = :1
             AND packed = 'N'
-        ORDER BY tid desc
+        ORDER BY tid DESC
         """
         cursor.execute(stmt, (oid,))
         return iter(cursor)

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2008-02-05 07:08:27 UTC (rev 83523)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2008-02-05 09:30:29 UTC (rev 83524)
@@ -291,7 +291,7 @@
         FROM object_state
         WHERE zoid = %s
             AND tid < %s
-        ORDER BY tid desc
+        ORDER BY tid DESC
         LIMIT 1
         """, (oid, tid))
         if cursor.rowcount:
@@ -326,6 +326,32 @@
         else:
             return None
 
+    def _make_temp_table(self, cursor):
+        """Create the temporary table for storing objects"""
+        if self._twophase:
+            # PostgreSQL does not allow two phase transactions
+            # to use temporary tables. :-(
+            stmt = """
+            CREATE TABLE temp_store (
+                zoid        BIGINT NOT NULL,
+                prev_tid    BIGINT NOT NULL,
+                md5         CHAR(32),
+                state       BYTEA
+            );
+            CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+            """
+        else:
+            stmt = """
+            CREATE TEMPORARY TABLE temp_store (
+                zoid        BIGINT NOT NULL,
+                prev_tid    BIGINT NOT NULL,
+                md5         CHAR(32),
+                state       BYTEA
+            ) ON COMMIT DROP;
+            CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+            """
+        cursor.execute(stmt)
+
     def open_for_store(self):
         """Open and initialize a connection for storing objects.
 
@@ -333,36 +359,25 @@
         """
         conn, cursor = self.open()
         try:
-            if self._twophase:
-                # PostgreSQL does not allow two phase transactions
-                # to use temporary tables. :-(
-                stmt = """
-                CREATE TABLE temp_store (
-                    zoid        BIGINT NOT NULL,
-                    prev_tid    BIGINT NOT NULL,
-                    md5         CHAR(32),
-                    state       BYTEA
-                );
-                CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
-                """
-            else:
-                stmt = """
-                CREATE TEMPORARY TABLE temp_store (
-                    zoid        BIGINT NOT NULL,
-                    prev_tid    BIGINT NOT NULL,
-                    md5         CHAR(32),
-                    state       BYTEA
-                ) ON COMMIT DROP;
-                CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
-                """
-            cursor.execute(stmt)
+            self._make_temp_table(cursor)
             return conn, cursor
         except:
             self.close(conn, cursor)
             raise
 
+    def restart_store(self, cursor):
+        """Reuse a store connection."""
+        try:
+            cursor.connection.rollback()
+            if self._twophase:
+                cursor.connection.set_isolation_level(
+                    psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+            self._make_temp_table(cursor)
+        except (psycopg2.OperationalError, psycopg2.InterfaceError):
+            raise StorageError("database disconnected")
+
     def store_temp(self, cursor, oid, prev_tid, md5sum, data):
-        """Store an object."""
+        """Store an object in the temporary table."""
         stmt = """
         INSERT INTO temp_store (zoid, prev_tid, md5, state)
         VALUES (%s, %s, %s, decode(%s, 'base64'))
@@ -382,13 +397,19 @@
 
     def start_commit(self, cursor):
         """Prepare to commit."""
+        # Hold commit_lock to prevent concurrent commits
+        # (for as short a time as possible).
+        # Lock current_object in share mode to ensure conflict
+        # detection has the most current data.
+        cursor.execute("""
+        LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+        LOCK TABLE current_object IN SHARE MODE
+        """)
         cursor.execute("SAVEPOINT start_commit")
-        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
 
     def restart_commit(self, cursor):
-        """Rollback the attempt to commit and start over."""
+        """Rollback the attempt to commit and start again."""
         cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
-        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
 
     def get_tid_and_time(self, cursor):
         """Returns the most recent tid and the current database time.
@@ -533,7 +554,7 @@
         FROM transaction
         WHERE packed = FALSE
             AND tid != 0
-        ORDER BY tid desc
+        ORDER BY tid DESC
         """
         cursor.execute(stmt)
         return iter(cursor)
@@ -559,7 +580,7 @@
             JOIN object_state USING (tid)
         WHERE zoid = %s
             AND packed = FALSE
-        ORDER BY tid desc
+        ORDER BY tid DESC
         """
         cursor.execute(stmt, (oid,))
         return iter(cursor)
@@ -692,7 +713,7 @@
         FROM transaction
         WHERE tid > 0 AND tid <= %s
             AND packed = FALSE
-        ORDER BY tid desc
+        ORDER BY tid DESC
         LIMIT 1
         """
         cursor.execute(stmt, (pack_point,))

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2008-02-05 07:08:27 UTC (rev 83523)
+++ relstorage/trunk/relstorage/relstorage.py	2008-02-05 09:30:29 UTC (rev 83524)
@@ -51,12 +51,13 @@
         if create:
             self._adapter.prepare_schema()
 
-        # load_conn and load_cursor are usually open
+        # load_conn and load_cursor are open most of the time.
         self._load_conn = None
         self._load_cursor = None
         self._load_started = False
         self._open_load_connection()
-        # store_conn and store_cursor are open only during commit
+        # store_conn and store_cursor are open during commit,
+        # but not necessarily open at other times.
         self._store_conn = None
         self._store_cursor = None
 
@@ -289,8 +290,18 @@
             self._tstatus = status
 
             adapter = self._adapter
-            conn, cursor = adapter.open_for_store()
-            self._store_conn, self._store_cursor = conn, cursor
+            cursor = self._store_cursor
+            if cursor is not None:
+                # Store cursor is still open, so try to use it again.
+                try:
+                    adapter.restart_store(cursor)
+                except POSException.StorageError:
+                    cursor = None
+                    log.exception("Store connection failed; retrying")
+                    self._drop_store_connection()
+            if cursor is None:
+                conn, cursor = adapter.open_for_store()
+                self._store_conn, self._store_cursor = conn, cursor
 
             if tid is not None:
                 # get the commit lock and add the transaction now
@@ -371,7 +382,7 @@
         cursor = self._store_cursor
         adapter = self._adapter
 
-        # List conflicting changes.
+        # Detect conflicting changes.
         # Try to resolve the conflicts.
         resolved = set()  # a set of OIDs
         while True:
@@ -386,16 +397,18 @@
 
             rdata = self.tryToResolveConflict(oid, prev_tid, serial, data)
             if rdata is None:
+                # unresolvable; kill the whole transaction
                 raise POSException.ConflictError(
                     oid=oid, serials=(prev_tid, serial), data=data)
             else:
+                # resolved
                 data = rdata
                 md5sum = md5.new(data).hexdigest()
                 self._adapter.replace_temp(
                     cursor, oid_int, prev_tid_int, md5sum, data)
                 resolved.add(oid)
 
-        # Move the data
+        # Move the new states into the permanent table
         tid_int = u64(self._tid)
         serials = []
         oid_ints = adapter.move_from_temp(cursor, tid_int)
@@ -460,7 +473,6 @@
         txn = self._prepared_txn
         assert txn is not None
         self._adapter.commit_phase2(self._store_cursor, txn)
-        self._drop_store_connection()
         self._prepared_txn = None
         self._ltid = self._tid
         self._tid = None
@@ -471,7 +483,6 @@
         if self._store_cursor is not None:
             self._adapter.abort(self._store_cursor, self._prepared_txn)
         self._prepared_txn = None
-        self._drop_store_connection()
         self._tid = None
 
     def lastTransaction(self):

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2008-02-05 07:08:27 UTC (rev 83523)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2008-02-05 09:30:29 UTC (rev 83524)
@@ -206,3 +206,16 @@
         got, serialno = self._storage.load(oid, '')
         self.assertEqual(len(got), len(data))
         self.assertEqual(got, data)
+
+    def checkMultipleStores(self):
+        # Verify a connection can commit multiple transactions
+        db = DB(self._storage)
+        try:
+            c1 = db.open()
+            r1 = c1.root()
+            r1['alpha'] = 1
+            transaction.commit()
+            r1['alpha'] = 2
+            transaction.commit()
+        finally:
+            db.close()



More information about the Checkins mailing list