[Checkins] SVN: relstorage/trunk/ Reworked the auto-reconnect logic so that applications never see

Shane Hathaway shane at hathawaymix.org
Fri Mar 7 02:01:49 EST 2008


Log message for revision 84523:
  Reworked the auto-reconnect logic so that applications never see
  temporary database disconnects if possible.  Thanks to Rigel Di Scala
  for pointing out this issue.
  
  

Changed:
  U   relstorage/trunk/CHANGELOG.txt
  U   relstorage/trunk/relstorage/adapters/common.py
  U   relstorage/trunk/relstorage/adapters/mysql.py
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  U   relstorage/trunk/relstorage/relstorage.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/trunk/CHANGELOG.txt
===================================================================
--- relstorage/trunk/CHANGELOG.txt	2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/CHANGELOG.txt	2008-03-07 07:01:47 UTC (rev 84523)
@@ -5,7 +5,16 @@
   Now the script creates the tables if needed.  Thanks to Flavio Coelho
   for discovering this.
 
+- Reworked the auto-reconnect logic so that applications never see
+  temporary database disconnects if possible.  Thanks to Rigel Di Scala
+  for pointing out this issue.
 
+- Improved the log messages explaining database connection failures.
+
+- Moved poll_invalidations to the common adapter base class, reducing the
+  amount of code to maintain.
+
+
 RelStorage 1.0
 
 - Added a utility for converting between storages called zodbconvert.

Modified: relstorage/trunk/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py	2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/adapters/common.py	2008-03-07 07:01:47 UTC (rev 84523)
@@ -679,3 +679,51 @@
 
         finally:
             self.close(conn, cursor)
+
+
+    def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+        """Polls for new transactions.
+
+        conn and cursor must have been created previously by open_for_load().
+        prev_polled_tid is the tid returned at the last poll, or None
+        if this is the first poll.  If ignore_tid is not None, changes
+        committed in that transaction will not be included in the list
+        of changed OIDs.
+
+        Returns (changed_oids, new_polled_tid).
+        """
+        # find out the tid of the most recent transaction.
+        cursor.execute(self._poll_query)
+        new_polled_tid = cursor.fetchone()[0]
+
+        if prev_polled_tid is None:
+            # This is the first time the connection has polled.
+            return None, new_polled_tid
+
+        if new_polled_tid == prev_polled_tid:
+            # No transactions have been committed since prev_polled_tid.
+            return (), new_polled_tid
+
+        stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
+        cursor.execute(stmt % self._script_vars, {'tid': prev_polled_tid})
+        rows = cursor.fetchall()
+        if not rows:
+            # Transaction not found; perhaps it has been packed.
+            # The connection cache needs to be cleared.
+            return None, new_polled_tid
+
+        # Get the list of changed OIDs and return it.
+        stmt = """
+        SELECT DISTINCT zoid
+        FROM object_state
+        WHERE tid > %(tid)s
+        """
+        if ignore_tid is None:
+            cursor.execute(stmt % self._script_vars, {'tid': prev_polled_tid})
+        else:
+            stmt += " AND tid != %(self_tid)s"
+            cursor.execute(stmt % self._script_vars,
+                {'tid': prev_polled_tid, 'self_tid': ignore_tid})
+        oids = [oid for (oid,) in cursor]
+
+        return oids, new_polled_tid

Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py	2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/adapters/mysql.py	2008-03-07 07:01:47 UTC (rev 84523)
@@ -211,8 +211,8 @@
                 cursor.execute("SET SESSION TRANSACTION %s" % transaction_mode)
                 conn.autocommit(False)
             return conn, cursor
-        except MySQLdb.OperationalError:
-            log.warning("Unable to connect in %s", repr(self))
+        except MySQLdb.OperationalError, e:
+            log.warning("Unable to connect: %s", e)
             raise
 
     def close(self, conn, cursor):
@@ -223,7 +223,8 @@
                 try:
                     obj.close()
                 except (MySQLdb.InterfaceError,
-                        MySQLdb.OperationalError):
+                        MySQLdb.OperationalError,
+                        MySQLdb.ProgrammingError):
                     pass
 
     def open_for_load(self):
@@ -234,9 +235,11 @@
         return self.open("ISOLATION LEVEL REPEATABLE READ")
 
     def restart_load(self, cursor):
-        """After a rollback, reinitialize a connection for loading objects."""
-        # No re-init necessary
-        pass
+        """Reinitialize a connection for loading objects."""
+        try:
+            cursor.connection.rollback()
+        except (MySQLdb.OperationalError, MySQLdb.InterfaceError), e:
+            raise StorageError(e)
 
     def get_object_count(self):
         """Returns the number of objects in the database"""
@@ -356,8 +359,8 @@
         try:
             cursor.connection.rollback()
             cursor.execute("TRUNCATE temp_store")
-        except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
-            raise StorageError("database disconnected")
+        except (MySQLdb.OperationalError, MySQLdb.InterfaceError), e:
+            raise StorageError(e)
 
     def store_temp(self, cursor, oid, prev_tid, md5sum, data):
         """Store an object in the temporary table."""
@@ -570,55 +573,4 @@
             raise StorageError("Unable to acquire commit lock")
 
 
-    def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
-        """Polls for new transactions.
-
-        conn and cursor must have been created previously by open_for_load().
-        prev_polled_tid is the tid returned at the last poll, or None
-        if this is the first poll.  If ignore_tid is not None, changes
-        committed in that transaction will not be included in the list
-        of changed OIDs.
-
-        Returns (changed_oids, new_polled_tid).  Raises StorageError
-        if the database has disconnected.
-        """
-        try:
-            # find out the tid of the most recent transaction.
-            stmt = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"
-            cursor.execute(stmt)
-            # Expect the transaction table to always have at least one row.
-            assert cursor.rowcount == 1
-            new_polled_tid = cursor.fetchone()[0]
-
-            if prev_polled_tid is None:
-                # This is the first time the connection has polled.
-                return None, new_polled_tid
-
-            if new_polled_tid == prev_polled_tid:
-                # No transactions have been committed since prev_polled_tid.
-                return (), new_polled_tid
-
-            stmt = "SELECT 1 FROM transaction WHERE tid = %s"
-            cursor.execute(stmt, (prev_polled_tid,))
-            if not cursor.rowcount:
-                # Transaction not found; perhaps it has been packed.
-                # The connection cache needs to be cleared.
-                return None, new_polled_tid
-
-            # Get the list of changed OIDs and return it.
-            stmt = """
-            SELECT DISTINCT zoid
-            FROM object_state
-                JOIN transaction USING (tid)
-            WHERE tid > %s
-            """
-            if ignore_tid is not None:
-                stmt += " AND tid != %d" % ignore_tid
-            cursor.execute(stmt, (prev_polled_tid,))
-            oids = [oid for (oid,) in cursor]
-
-            return oids, new_polled_tid
-
-        except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
-            raise StorageError("database disconnected")
-
+    _poll_query = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2008-03-07 07:01:47 UTC (rev 84523)
@@ -270,8 +270,8 @@
                 cursor.execute("SET TRANSACTION %s" % transaction_mode)
             return conn, cursor
 
-        except cx_Oracle.OperationalError:
-            log.warning("Unable to connect to DSN %s", self._params[2])
+        except cx_Oracle.OperationalError, e:
+            log.warning("Unable to connect: %s", e)
             raise
 
     def close(self, conn, cursor):
@@ -292,8 +292,12 @@
         return self.open('READ ONLY')
 
     def restart_load(self, cursor):
-        """After a rollback, reinitialize a connection for loading objects."""
-        cursor.execute("SET TRANSACTION READ ONLY")
+        """Reinitialize a connection for loading objects."""
+        try:
+            cursor.connection.rollback()
+            cursor.execute("SET TRANSACTION READ ONLY")
+        except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError), e:
+            raise StorageError(e)
 
     def get_object_count(self):
         """Returns the number of objects in the database"""
@@ -436,8 +440,8 @@
             cursor.connection.rollback()
             if self._twophase:
                 self._set_xid(cursor)
-        except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
-            raise StorageError("database disconnected")
+        except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError), e:
+            raise StorageError(e)
 
     def store_temp(self, cursor, oid, prev_tid, md5sum, data):
         """Store an object in the temporary table."""
@@ -665,58 +669,9 @@
         cursor.executemany(stmt, add_rows)
 
 
-    def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
-        """Polls for new transactions.
+    _poll_query = "SELECT MAX(tid) FROM transaction"
 
-        conn and cursor must have been created previously by open_for_load().
-        prev_polled_tid is the tid returned at the last poll, or None
-        if this is the first poll.  If ignore_tid is not None, changes
-        committed in that transaction will not be included in the list
-        of changed OIDs.
 
-        Returns (changed_oids, new_polled_tid).  Raises StorageError
-        if the database has disconnected.
-        """
-        try:
-            # find out the tid of the most recent transaction.
-            stmt = "SELECT MAX(tid) FROM transaction"
-            cursor.execute(stmt)
-            new_polled_tid = list(cursor)[0][0]
-
-            if prev_polled_tid is None:
-                # This is the first time the connection has polled.
-                return None, new_polled_tid
-
-            if new_polled_tid == prev_polled_tid:
-                # No transactions have been committed since prev_polled_tid.
-                return (), new_polled_tid
-
-            stmt = "SELECT 1 FROM transaction WHERE tid = :1"
-            cursor.execute(stmt, (prev_polled_tid,))
-            rows = cursor.fetchall()
-            if not rows:
-                # Transaction not found; perhaps it has been packed.
-                # The connection cache needs to be cleared.
-                return None, new_polled_tid
-
-            # Get the list of changed OIDs and return it.
-            stmt = """
-            SELECT DISTINCT zoid
-            FROM object_state
-                JOIN transaction USING (tid)
-            WHERE tid > :1
-            """
-            if ignore_tid is not None:
-                stmt += " AND tid != %d" % ignore_tid
-            cursor.execute(stmt, (prev_polled_tid,))
-            oids = [oid for (oid,) in cursor]
-
-            return oids, new_polled_tid
-
-        except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
-            raise StorageError("database disconnected")
-
-
 class TrackingMap:
     """Provides values for keys while tracking which keys are accessed."""
 

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2008-03-07 07:01:47 UTC (rev 84523)
@@ -177,8 +177,8 @@
             conn.set_isolation_level(isolation)
             cursor = conn.cursor()
             cursor.arraysize = 64
-        except psycopg2.OperationalError:
-            log.warning("Unable to connect in %s", repr(self))
+        except psycopg2.OperationalError, e:
+            log.warning("Unable to connect: %s", e)
             raise
         return conn, cursor
 
@@ -211,9 +211,11 @@
         return conn, cursor
 
     def restart_load(self, cursor):
-        """After a rollback, reinitialize a connection for loading objects."""
-        # No re-init necessary
-        pass
+        """Reinitialize a connection for loading objects."""
+        try:
+            cursor.connection.rollback()
+        except (psycopg2.OperationalError, psycopg2.InterfaceError), e:
+            raise StorageError(e)
 
     def get_object_count(self):
         """Returns the number of objects in the database"""
@@ -367,8 +369,8 @@
                 cursor.connection.set_isolation_level(
                     psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
             self._make_temp_table(cursor)
-        except (psycopg2.OperationalError, psycopg2.InterfaceError):
-            raise StorageError("database disconnected")
+        except (psycopg2.OperationalError, psycopg2.InterfaceError), e:
+            raise StorageError(e)
 
     def store_temp(self, cursor, oid, prev_tid, md5sum, data):
         """Store an object in the temporary table."""
@@ -573,55 +575,4 @@
         # No action needed
         pass
 
-
-    def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
-        """Polls for new transactions.
-
-        conn and cursor must have been created previously by open_for_load().
-        prev_polled_tid is the tid returned at the last poll, or None
-        if this is the first poll.  If ignore_tid is not None, changes
-        committed in that transaction will not be included in the list
-        of changed OIDs.
-
-        Returns (changed_oids, new_polled_tid).  Raises StorageError
-        if the database has disconnected.
-        """
-        try:
-            # find out the tid of the most recent transaction.
-            cursor.execute("EXECUTE get_latest_tid")
-            # Expect the transaction table to always have at least one row.
-            assert cursor.rowcount == 1
-            new_polled_tid = cursor.fetchone()[0]
-
-            if prev_polled_tid is None:
-                # This is the first time the connection has polled.
-                return None, new_polled_tid
-
-            if new_polled_tid == prev_polled_tid:
-                # No transactions have been committed since prev_polled_tid.
-                return (), new_polled_tid
-
-            stmt = "SELECT 1 FROM transaction WHERE tid = %s"
-            cursor.execute(stmt, (prev_polled_tid,))
-            if not cursor.rowcount:
-                # Transaction not found; perhaps it has been packed.
-                # The connection cache needs to be cleared.
-                return None, new_polled_tid
-
-            # Get the list of changed OIDs and return it.
-            stmt = """
-            SELECT DISTINCT zoid
-            FROM object_state
-                JOIN transaction USING (tid)
-            WHERE tid > %s
-            """
-            if ignore_tid is not None:
-                stmt += " AND tid != %d" % ignore_tid
-            cursor.execute(stmt, (prev_polled_tid,))
-            oids = [oid for (oid,) in cursor]
-
-            return oids, new_polled_tid
-
-        except (psycopg2.OperationalError, psycopg2.InterfaceError):
-            raise StorageError("database disconnected")
-
+    _poll_query = "EXECUTE get_latest_tid"

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/relstorage.py	2008-03-07 07:01:47 UTC (rev 84523)
@@ -100,27 +100,72 @@
         self._load_transaction_open = True
 
     def _drop_load_connection(self):
+        """Unconditionally drop the load connection"""
         conn, cursor = self._load_conn, self._load_cursor
         self._load_conn, self._load_cursor = None, None
         self._adapter.close(conn, cursor)
+        self._load_transaction_open = False
 
-    def _drop_store_connection(self):
-        conn, cursor = self._store_conn, self._store_cursor
-        self._store_conn, self._store_cursor = None, None
-        self._adapter.close(conn, cursor)
-
     def _rollback_load_connection(self):
         if self._load_conn is not None:
-            self._load_conn.rollback()
+            try:
+                self._load_conn.rollback()
+            except:
+                self._drop_load_connection()
+                raise
             self._load_transaction_open = False
 
-    def _start_load(self):
+    def _restart_load(self):
+        """Restart the load connection, creating a new connection if needed"""
         if self._load_cursor is None:
             self._open_load_connection()
         else:
-            self._adapter.restart_load(self._load_cursor)
+            try:
+                self._adapter.restart_load(self._load_cursor)
+            except POSException.StorageError, e:
+                log.warning("Reconnecting load_conn: %s", e)
+                self._drop_load_connection()
+                try:
+                    self._open_load_connection()
+                except:
+                    log.exception("Reconnect failed.")
+                    raise
+                else:
+                    log.info("Reconnected.")
             self._load_transaction_open = True
 
+
+    def _open_store_connection(self):
+        """Open the store connection to the database.  Return nothing."""
+        conn, cursor = self._adapter.open_for_store()
+        self._drop_store_connection()
+        self._store_conn, self._store_cursor = conn, cursor
+
+    def _drop_store_connection(self):
+        """Unconditionally drop the store connection"""
+        conn, cursor = self._store_conn, self._store_cursor
+        self._store_conn, self._store_cursor = None, None
+        self._adapter.close(conn, cursor)
+
+    def _restart_store(self):
+        """Restart the store connection, creating a new connection if needed"""
+        if self._store_cursor is None:
+            self._open_store_connection()
+        else:
+            try:
+                self._adapter.restart_store(self._store_cursor)
+            except POSException.StorageError, e:
+                log.warning("Reconnecting store_conn: %s", e)
+                self._drop_store_connection()
+                try:
+                    self._open_store_connection()
+                except:
+                    log.exception("Reconnect failed.")
+                    raise
+                else:
+                    log.info("Reconnected.")
+
+
     def zap_all(self):
         """Clear all objects and transactions out of the database.
 
@@ -155,6 +200,7 @@
 
     def connection_closing(self):
         """Release resources."""
+        # Note that this is overridden in BoundRelStorage.
         self._rollback_load_connection()
 
     def __len__(self):
@@ -168,7 +214,7 @@
         self._lock_acquire()
         try:
             if not self._load_transaction_open:
-                self._start_load()
+                self._restart_load()
             cursor = self._load_cursor
             state, tid_int = self._adapter.load_current(cursor, u64(oid))
         finally:
@@ -199,7 +245,7 @@
                 cursor = self._store_cursor
             else:
                 if not self._load_transaction_open:
-                    self._start_load()
+                    self._restart_load()
                 cursor = self._load_cursor
             state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
             if state is not None:
@@ -224,7 +270,7 @@
                 cursor = self._store_cursor
             else:
                 if not self._load_transaction_open:
-                    self._start_load()
+                    self._restart_load()
                 cursor = self._load_cursor
             if not self._adapter.exists(cursor, u64(oid)):
                 raise KeyError(oid)
@@ -338,21 +384,11 @@
             self._tstatus = status
 
             adapter = self._adapter
-            cursor = self._store_cursor
-            if cursor is not None:
-                # Store cursor is still open, so try to use it again.
-                try:
-                    adapter.restart_store(cursor)
-                except POSException.StorageError:
-                    cursor = None
-                    log.exception("Store connection failed; retrying")
-                    self._drop_store_connection()
-            if cursor is None:
-                conn, cursor = adapter.open_for_store()
-                self._store_conn, self._store_cursor = conn, cursor
+            self._restart_store()
 
             if tid is not None:
                 # get the commit lock and add the transaction now
+                cursor = self._store_cursor
                 packed = (status == 'p')
                 adapter.start_commit(cursor)
                 tid_int = u64(tid)
@@ -754,7 +790,6 @@
             poll_interval=parent._poll_interval, pack_gc=parent._pack_gc)
         # _prev_polled_tid contains the tid at the previous poll
         self._prev_polled_tid = None
-        self._showed_disconnect = False
         self._poll_at = 0
 
     def connection_closing(self):
@@ -773,7 +808,7 @@
         finally:
             self._lock_release()
 
-    def poll_invalidations(self, retry=True):
+    def poll_invalidations(self):
         """Looks for OIDs of objects that changed since _prev_polled_tid
 
         Returns {oid: 1}, or None if all objects need to be invalidated
@@ -795,43 +830,29 @@
                 # else poll now after resetting the timeout
                 self._poll_at = now + self._poll_interval
 
-            try:
-                self._rollback_load_connection()
-                self._start_load()
-                conn = self._load_conn
-                cursor = self._load_cursor
+            self._restart_load()
+            conn = self._load_conn
+            cursor = self._load_cursor
 
-                # Ignore changes made by the last transaction committed
-                # by this connection.
-                if self._ltid is not None:
-                    ignore_tid = u64(self._ltid)
-                else:
-                    ignore_tid = None
+            # Ignore changes made by the last transaction committed
+            # by this connection.
+            if self._ltid is not None:
+                ignore_tid = u64(self._ltid)
+            else:
+                ignore_tid = None
 
-                # get a list of changed OIDs and the most recent tid
-                oid_ints, new_polled_tid = self._adapter.poll_invalidations(
-                    conn, cursor, self._prev_polled_tid, ignore_tid)
-                self._prev_polled_tid = new_polled_tid
+            # get a list of changed OIDs and the most recent tid
+            oid_ints, new_polled_tid = self._adapter.poll_invalidations(
+                conn, cursor, self._prev_polled_tid, ignore_tid)
+            self._prev_polled_tid = new_polled_tid
 
-                if oid_ints is None:
-                    oids = None
-                else:
-                    oids = {}
-                    for oid_int in oid_ints:
-                        oids[p64(oid_int)] = 1
-                return oids
-            except POSException.StorageError:
-                # disconnected
-                self._poll_at = 0
-                if not retry:
-                    raise
-                if not self._showed_disconnect:
-                    log.warning("Lost connection in %s", repr(self))
-                    self._showed_disconnect = True
-                self._open_load_connection()
-                log.info("Reconnected in %s", repr(self))
-                self._showed_disconnect = False
-                return self.poll_invalidations(retry=False)
+            if oid_ints is None:
+                oids = None
+            else:
+                oids = {}
+                for oid_int in oid_ints:
+                    oids[p64(oid_int)] = 1
+            return oids
         finally:
             self._lock_release()
 

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2008-03-07 07:01:47 UTC (rev 84523)
@@ -237,6 +237,26 @@
         finally:
             db.close()
 
+    def checkAutoReconnect(self):
+        # Verify auto-reconnect
+        db = DB(self._storage)
+        try:
+            c1 = db.open()
+            r = c1.root()
+            r['alpha'] = 1
+            transaction.commit()
+            c1.close()
+
+            c1._storage._load_conn.close()
+
+            c2 = db.open()
+            self.assert_(c2 is c1)
+            r = c2.root()
+            self.assertEqual(r['alpha'], 1)
+            c2.close()
+        finally:
+            db.close()
+
     def checkPollInterval(self):
         # Verify the poll_interval parameter causes RelStorage to
         # delay invalidation polling.



More information about the Checkins mailing list