[Checkins] SVN: relstorage/trunk/ Reworked the auto-reconnect logic
so that applications never see
Shane Hathaway
shane at hathawaymix.org
Fri Mar 7 02:01:49 EST 2008
Log message for revision 84523:
Reworked the auto-reconnect logic so that applications never see
temporary database disconnects if possible. Thanks to Rigel Di Scala
for pointing out this issue.
Changed:
U relstorage/trunk/CHANGELOG.txt
U relstorage/trunk/relstorage/adapters/common.py
U relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/postgresql.py
U relstorage/trunk/relstorage/relstorage.py
U relstorage/trunk/relstorage/tests/reltestbase.py
-=-
Modified: relstorage/trunk/CHANGELOG.txt
===================================================================
--- relstorage/trunk/CHANGELOG.txt 2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/CHANGELOG.txt 2008-03-07 07:01:47 UTC (rev 84523)
@@ -5,7 +5,16 @@
Now the script creates the tables if needed. Thanks to Flavio Coelho
for discovering this.
+- Reworked the auto-reconnect logic so that applications never see
+ temporary database disconnects if possible. Thanks to Rigel Di Scala
+ for pointing out this issue.
+- Improved the log messages explaining database connection failures.
+
+- Moved poll_invalidations to the common adapter base class, reducing the
+ amount of code to maintain.
+
+
RelStorage 1.0
- Added a utility for converting between storages called zodbconvert.
Modified: relstorage/trunk/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py 2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/adapters/common.py 2008-03-07 07:01:47 UTC (rev 84523)
@@ -679,3 +679,51 @@
finally:
self.close(conn, cursor)
+
+
+ def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid).
+ """
+ # find out the tid of the most recent transaction.
+ cursor.execute(self._poll_query)
+ new_polled_tid = cursor.fetchone()[0]
+
+ if prev_polled_tid is None:
+ # This is the first time the connection has polled.
+ return None, new_polled_tid
+
+ if new_polled_tid == prev_polled_tid:
+ # No transactions have been committed since prev_polled_tid.
+ return (), new_polled_tid
+
+ stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
+ cursor.execute(stmt % self._script_vars, {'tid': prev_polled_tid})
+ rows = cursor.fetchall()
+ if not rows:
+ # Transaction not found; perhaps it has been packed.
+ # The connection cache needs to be cleared.
+ return None, new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ stmt = """
+ SELECT DISTINCT zoid
+ FROM object_state
+ WHERE tid > %(tid)s
+ """
+ if ignore_tid is None:
+ cursor.execute(stmt % self._script_vars, {'tid': prev_polled_tid})
+ else:
+ stmt += " AND tid != %(self_tid)s"
+ cursor.execute(stmt % self._script_vars,
+ {'tid': prev_polled_tid, 'self_tid': ignore_tid})
+ oids = [oid for (oid,) in cursor]
+
+ return oids, new_polled_tid
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2008-03-07 07:01:47 UTC (rev 84523)
@@ -211,8 +211,8 @@
cursor.execute("SET SESSION TRANSACTION %s" % transaction_mode)
conn.autocommit(False)
return conn, cursor
- except MySQLdb.OperationalError:
- log.warning("Unable to connect in %s", repr(self))
+ except MySQLdb.OperationalError, e:
+ log.warning("Unable to connect: %s", e)
raise
def close(self, conn, cursor):
@@ -223,7 +223,8 @@
try:
obj.close()
except (MySQLdb.InterfaceError,
- MySQLdb.OperationalError):
+ MySQLdb.OperationalError,
+ MySQLdb.ProgrammingError):
pass
def open_for_load(self):
@@ -234,9 +235,11 @@
return self.open("ISOLATION LEVEL REPEATABLE READ")
def restart_load(self, cursor):
- """After a rollback, reinitialize a connection for loading objects."""
- # No re-init necessary
- pass
+ """Reinitialize a connection for loading objects."""
+ try:
+ cursor.connection.rollback()
+ except (MySQLdb.OperationalError, MySQLdb.InterfaceError), e:
+ raise StorageError(e)
def get_object_count(self):
"""Returns the number of objects in the database"""
@@ -356,8 +359,8 @@
try:
cursor.connection.rollback()
cursor.execute("TRUNCATE temp_store")
- except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
- raise StorageError("database disconnected")
+ except (MySQLdb.OperationalError, MySQLdb.InterfaceError), e:
+ raise StorageError(e)
def store_temp(self, cursor, oid, prev_tid, md5sum, data):
"""Store an object in the temporary table."""
@@ -570,55 +573,4 @@
raise StorageError("Unable to acquire commit lock")
- def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
- """Polls for new transactions.
-
- conn and cursor must have been created previously by open_for_load().
- prev_polled_tid is the tid returned at the last poll, or None
- if this is the first poll. If ignore_tid is not None, changes
- committed in that transaction will not be included in the list
- of changed OIDs.
-
- Returns (changed_oids, new_polled_tid). Raises StorageError
- if the database has disconnected.
- """
- try:
- # find out the tid of the most recent transaction.
- stmt = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"
- cursor.execute(stmt)
- # Expect the transaction table to always have at least one row.
- assert cursor.rowcount == 1
- new_polled_tid = cursor.fetchone()[0]
-
- if prev_polled_tid is None:
- # This is the first time the connection has polled.
- return None, new_polled_tid
-
- if new_polled_tid == prev_polled_tid:
- # No transactions have been committed since prev_polled_tid.
- return (), new_polled_tid
-
- stmt = "SELECT 1 FROM transaction WHERE tid = %s"
- cursor.execute(stmt, (prev_polled_tid,))
- if not cursor.rowcount:
- # Transaction not found; perhaps it has been packed.
- # The connection cache needs to be cleared.
- return None, new_polled_tid
-
- # Get the list of changed OIDs and return it.
- stmt = """
- SELECT DISTINCT zoid
- FROM object_state
- JOIN transaction USING (tid)
- WHERE tid > %s
- """
- if ignore_tid is not None:
- stmt += " AND tid != %d" % ignore_tid
- cursor.execute(stmt, (prev_polled_tid,))
- oids = [oid for (oid,) in cursor]
-
- return oids, new_polled_tid
-
- except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
- raise StorageError("database disconnected")
-
+ _poll_query = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2008-03-07 07:01:47 UTC (rev 84523)
@@ -270,8 +270,8 @@
cursor.execute("SET TRANSACTION %s" % transaction_mode)
return conn, cursor
- except cx_Oracle.OperationalError:
- log.warning("Unable to connect to DSN %s", self._params[2])
+ except cx_Oracle.OperationalError, e:
+ log.warning("Unable to connect: %s", e)
raise
def close(self, conn, cursor):
@@ -292,8 +292,12 @@
return self.open('READ ONLY')
def restart_load(self, cursor):
- """After a rollback, reinitialize a connection for loading objects."""
- cursor.execute("SET TRANSACTION READ ONLY")
+ """Reinitialize a connection for loading objects."""
+ try:
+ cursor.connection.rollback()
+ cursor.execute("SET TRANSACTION READ ONLY")
+ except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError), e:
+ raise StorageError(e)
def get_object_count(self):
"""Returns the number of objects in the database"""
@@ -436,8 +440,8 @@
cursor.connection.rollback()
if self._twophase:
self._set_xid(cursor)
- except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
- raise StorageError("database disconnected")
+ except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError), e:
+ raise StorageError(e)
def store_temp(self, cursor, oid, prev_tid, md5sum, data):
"""Store an object in the temporary table."""
@@ -665,58 +669,9 @@
cursor.executemany(stmt, add_rows)
- def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
- """Polls for new transactions.
+ _poll_query = "SELECT MAX(tid) FROM transaction"
- conn and cursor must have been created previously by open_for_load().
- prev_polled_tid is the tid returned at the last poll, or None
- if this is the first poll. If ignore_tid is not None, changes
- committed in that transaction will not be included in the list
- of changed OIDs.
- Returns (changed_oids, new_polled_tid). Raises StorageError
- if the database has disconnected.
- """
- try:
- # find out the tid of the most recent transaction.
- stmt = "SELECT MAX(tid) FROM transaction"
- cursor.execute(stmt)
- new_polled_tid = list(cursor)[0][0]
-
- if prev_polled_tid is None:
- # This is the first time the connection has polled.
- return None, new_polled_tid
-
- if new_polled_tid == prev_polled_tid:
- # No transactions have been committed since prev_polled_tid.
- return (), new_polled_tid
-
- stmt = "SELECT 1 FROM transaction WHERE tid = :1"
- cursor.execute(stmt, (prev_polled_tid,))
- rows = cursor.fetchall()
- if not rows:
- # Transaction not found; perhaps it has been packed.
- # The connection cache needs to be cleared.
- return None, new_polled_tid
-
- # Get the list of changed OIDs and return it.
- stmt = """
- SELECT DISTINCT zoid
- FROM object_state
- JOIN transaction USING (tid)
- WHERE tid > :1
- """
- if ignore_tid is not None:
- stmt += " AND tid != %d" % ignore_tid
- cursor.execute(stmt, (prev_polled_tid,))
- oids = [oid for (oid,) in cursor]
-
- return oids, new_polled_tid
-
- except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
- raise StorageError("database disconnected")
-
-
class TrackingMap:
"""Provides values for keys while tracking which keys are accessed."""
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2008-03-07 07:01:47 UTC (rev 84523)
@@ -177,8 +177,8 @@
conn.set_isolation_level(isolation)
cursor = conn.cursor()
cursor.arraysize = 64
- except psycopg2.OperationalError:
- log.warning("Unable to connect in %s", repr(self))
+ except psycopg2.OperationalError, e:
+ log.warning("Unable to connect: %s", e)
raise
return conn, cursor
@@ -211,9 +211,11 @@
return conn, cursor
def restart_load(self, cursor):
- """After a rollback, reinitialize a connection for loading objects."""
- # No re-init necessary
- pass
+ """Reinitialize a connection for loading objects."""
+ try:
+ cursor.connection.rollback()
+ except (psycopg2.OperationalError, psycopg2.InterfaceError), e:
+ raise StorageError(e)
def get_object_count(self):
"""Returns the number of objects in the database"""
@@ -367,8 +369,8 @@
cursor.connection.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
self._make_temp_table(cursor)
- except (psycopg2.OperationalError, psycopg2.InterfaceError):
- raise StorageError("database disconnected")
+ except (psycopg2.OperationalError, psycopg2.InterfaceError), e:
+ raise StorageError(e)
def store_temp(self, cursor, oid, prev_tid, md5sum, data):
"""Store an object in the temporary table."""
@@ -573,55 +575,4 @@
# No action needed
pass
-
- def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
- """Polls for new transactions.
-
- conn and cursor must have been created previously by open_for_load().
- prev_polled_tid is the tid returned at the last poll, or None
- if this is the first poll. If ignore_tid is not None, changes
- committed in that transaction will not be included in the list
- of changed OIDs.
-
- Returns (changed_oids, new_polled_tid). Raises StorageError
- if the database has disconnected.
- """
- try:
- # find out the tid of the most recent transaction.
- cursor.execute("EXECUTE get_latest_tid")
- # Expect the transaction table to always have at least one row.
- assert cursor.rowcount == 1
- new_polled_tid = cursor.fetchone()[0]
-
- if prev_polled_tid is None:
- # This is the first time the connection has polled.
- return None, new_polled_tid
-
- if new_polled_tid == prev_polled_tid:
- # No transactions have been committed since prev_polled_tid.
- return (), new_polled_tid
-
- stmt = "SELECT 1 FROM transaction WHERE tid = %s"
- cursor.execute(stmt, (prev_polled_tid,))
- if not cursor.rowcount:
- # Transaction not found; perhaps it has been packed.
- # The connection cache needs to be cleared.
- return None, new_polled_tid
-
- # Get the list of changed OIDs and return it.
- stmt = """
- SELECT DISTINCT zoid
- FROM object_state
- JOIN transaction USING (tid)
- WHERE tid > %s
- """
- if ignore_tid is not None:
- stmt += " AND tid != %d" % ignore_tid
- cursor.execute(stmt, (prev_polled_tid,))
- oids = [oid for (oid,) in cursor]
-
- return oids, new_polled_tid
-
- except (psycopg2.OperationalError, psycopg2.InterfaceError):
- raise StorageError("database disconnected")
-
+ _poll_query = "EXECUTE get_latest_tid"
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/relstorage.py 2008-03-07 07:01:47 UTC (rev 84523)
@@ -100,27 +100,72 @@
self._load_transaction_open = True
def _drop_load_connection(self):
+ """Unconditionally drop the load connection"""
conn, cursor = self._load_conn, self._load_cursor
self._load_conn, self._load_cursor = None, None
self._adapter.close(conn, cursor)
+ self._load_transaction_open = False
- def _drop_store_connection(self):
- conn, cursor = self._store_conn, self._store_cursor
- self._store_conn, self._store_cursor = None, None
- self._adapter.close(conn, cursor)
-
def _rollback_load_connection(self):
if self._load_conn is not None:
- self._load_conn.rollback()
+ try:
+ self._load_conn.rollback()
+ except:
+ self._drop_load_connection()
+ raise
self._load_transaction_open = False
- def _start_load(self):
+ def _restart_load(self):
+ """Restart the load connection, creating a new connection if needed"""
if self._load_cursor is None:
self._open_load_connection()
else:
- self._adapter.restart_load(self._load_cursor)
+ try:
+ self._adapter.restart_load(self._load_cursor)
+ except POSException.StorageError, e:
+ log.warning("Reconnecting load_conn: %s", e)
+ self._drop_load_connection()
+ try:
+ self._open_load_connection()
+ except:
+ log.exception("Reconnect failed.")
+ raise
+ else:
+ log.info("Reconnected.")
self._load_transaction_open = True
+
+ def _open_store_connection(self):
+ """Open the store connection to the database. Return nothing."""
+ conn, cursor = self._adapter.open_for_store()
+ self._drop_store_connection()
+ self._store_conn, self._store_cursor = conn, cursor
+
+ def _drop_store_connection(self):
+ """Unconditionally drop the store connection"""
+ conn, cursor = self._store_conn, self._store_cursor
+ self._store_conn, self._store_cursor = None, None
+ self._adapter.close(conn, cursor)
+
+ def _restart_store(self):
+ """Restart the store connection, creating a new connection if needed"""
+ if self._store_cursor is None:
+ self._open_store_connection()
+ else:
+ try:
+ self._adapter.restart_store(self._store_cursor)
+ except POSException.StorageError, e:
+ log.warning("Reconnecting store_conn: %s", e)
+ self._drop_store_connection()
+ try:
+ self._open_store_connection()
+ except:
+ log.exception("Reconnect failed.")
+ raise
+ else:
+ log.info("Reconnected.")
+
+
def zap_all(self):
"""Clear all objects and transactions out of the database.
@@ -155,6 +200,7 @@
def connection_closing(self):
"""Release resources."""
+ # Note that this is overridden in BoundRelStorage.
self._rollback_load_connection()
def __len__(self):
@@ -168,7 +214,7 @@
self._lock_acquire()
try:
if not self._load_transaction_open:
- self._start_load()
+ self._restart_load()
cursor = self._load_cursor
state, tid_int = self._adapter.load_current(cursor, u64(oid))
finally:
@@ -199,7 +245,7 @@
cursor = self._store_cursor
else:
if not self._load_transaction_open:
- self._start_load()
+ self._restart_load()
cursor = self._load_cursor
state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
if state is not None:
@@ -224,7 +270,7 @@
cursor = self._store_cursor
else:
if not self._load_transaction_open:
- self._start_load()
+ self._restart_load()
cursor = self._load_cursor
if not self._adapter.exists(cursor, u64(oid)):
raise KeyError(oid)
@@ -338,21 +384,11 @@
self._tstatus = status
adapter = self._adapter
- cursor = self._store_cursor
- if cursor is not None:
- # Store cursor is still open, so try to use it again.
- try:
- adapter.restart_store(cursor)
- except POSException.StorageError:
- cursor = None
- log.exception("Store connection failed; retrying")
- self._drop_store_connection()
- if cursor is None:
- conn, cursor = adapter.open_for_store()
- self._store_conn, self._store_cursor = conn, cursor
+ self._restart_store()
if tid is not None:
# get the commit lock and add the transaction now
+ cursor = self._store_cursor
packed = (status == 'p')
adapter.start_commit(cursor)
tid_int = u64(tid)
@@ -754,7 +790,6 @@
poll_interval=parent._poll_interval, pack_gc=parent._pack_gc)
# _prev_polled_tid contains the tid at the previous poll
self._prev_polled_tid = None
- self._showed_disconnect = False
self._poll_at = 0
def connection_closing(self):
@@ -773,7 +808,7 @@
finally:
self._lock_release()
- def poll_invalidations(self, retry=True):
+ def poll_invalidations(self):
"""Looks for OIDs of objects that changed since _prev_polled_tid
Returns {oid: 1}, or None if all objects need to be invalidated
@@ -795,43 +830,29 @@
# else poll now after resetting the timeout
self._poll_at = now + self._poll_interval
- try:
- self._rollback_load_connection()
- self._start_load()
- conn = self._load_conn
- cursor = self._load_cursor
+ self._restart_load()
+ conn = self._load_conn
+ cursor = self._load_cursor
- # Ignore changes made by the last transaction committed
- # by this connection.
- if self._ltid is not None:
- ignore_tid = u64(self._ltid)
- else:
- ignore_tid = None
+ # Ignore changes made by the last transaction committed
+ # by this connection.
+ if self._ltid is not None:
+ ignore_tid = u64(self._ltid)
+ else:
+ ignore_tid = None
- # get a list of changed OIDs and the most recent tid
- oid_ints, new_polled_tid = self._adapter.poll_invalidations(
- conn, cursor, self._prev_polled_tid, ignore_tid)
- self._prev_polled_tid = new_polled_tid
+ # get a list of changed OIDs and the most recent tid
+ oid_ints, new_polled_tid = self._adapter.poll_invalidations(
+ conn, cursor, self._prev_polled_tid, ignore_tid)
+ self._prev_polled_tid = new_polled_tid
- if oid_ints is None:
- oids = None
- else:
- oids = {}
- for oid_int in oid_ints:
- oids[p64(oid_int)] = 1
- return oids
- except POSException.StorageError:
- # disconnected
- self._poll_at = 0
- if not retry:
- raise
- if not self._showed_disconnect:
- log.warning("Lost connection in %s", repr(self))
- self._showed_disconnect = True
- self._open_load_connection()
- log.info("Reconnected in %s", repr(self))
- self._showed_disconnect = False
- return self.poll_invalidations(retry=False)
+ if oid_ints is None:
+ oids = None
+ else:
+ oids = {}
+ for oid_int in oid_ints:
+ oids[p64(oid_int)] = 1
+ return oids
finally:
self._lock_release()
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2008-03-06 22:45:24 UTC (rev 84522)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2008-03-07 07:01:47 UTC (rev 84523)
@@ -237,6 +237,26 @@
finally:
db.close()
+ def checkAutoReconnect(self):
+ # Verify auto-reconnect
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r = c1.root()
+ r['alpha'] = 1
+ transaction.commit()
+ c1.close()
+
+ c1._storage._load_conn.close()
+
+ c2 = db.open()
+ self.assert_(c2 is c1)
+ r = c2.root()
+ self.assertEqual(r['alpha'], 1)
+ c2.close()
+ finally:
+ db.close()
+
def checkPollInterval(self):
# Verify the poll_interval parameter causes RelStorage to
# delay invalidation polling.
More information about the Checkins
mailing list