[Checkins] SVN: relstorage/trunk/ Auto-reconnect in new_oid(). Useful for long transactions.

Shane Hathaway shane at hathawaymix.org
Tue Feb 2 14:34:23 EST 2010


Log message for revision 108723:
  Auto-reconnect in new_oid().  Useful for long transactions.
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/adapters/schema.py
  U   relstorage/trunk/relstorage/storage.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2010-02-02 16:50:49 UTC (rev 108722)
+++ relstorage/trunk/CHANGES.txt	2010-02-02 19:34:22 UTC (rev 108723)
@@ -1,7 +1,7 @@
 Next release
 ------------
 
-- ...
+- Auto-reconnect in new_oid().
 
 1.4.0b2 (2010-01-30)
 --------------------

Modified: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py	2010-02-02 16:50:49 UTC (rev 108722)
+++ relstorage/trunk/relstorage/adapters/schema.py	2010-02-02 19:34:22 UTC (rev 108723)
@@ -773,8 +773,8 @@
                     engine = row[col_index]
                     if engine.lower() != 'innodb':
                         raise StorageError(
-                            "The object_state must use the InnoDB engine, "
-                            "but it is using the %s engine." % engine)
+                            "The object_state table must use the InnoDB "
+                            "engine, but it is using the %s engine." % engine)
 
 
 class OracleSchemaInstaller(AbstractSchemaInstaller):

Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py	2010-02-02 16:50:49 UTC (rev 108722)
+++ relstorage/trunk/relstorage/storage.py	2010-02-02 19:34:22 UTC (rev 108723)
@@ -216,22 +216,38 @@
         """Restart the load connection, creating a new connection if needed"""
         if self._load_cursor is None:
             self._open_load_connection()
-        else:
+            return
+        try:
+            self._adapter.connmanager.restart_load(
+                self._load_conn, self._load_cursor)
+            self._load_transaction_open = True
+        except self._adapter.connmanager.disconnected_exceptions, e:
+            log.warning("Reconnecting load_conn: %s", e)
+            self._drop_load_connection()
             try:
-                self._adapter.connmanager.restart_load(
-                    self._load_conn, self._load_cursor)
-            except self._adapter.connmanager.disconnected_exceptions, e:
-                log.warning("Reconnecting load_conn: %s", e)
-                self._drop_load_connection()
-                try:
-                    self._open_load_connection()
-                except:
-                    log.exception("Reconnect failed.")
-                    raise
-                else:
-                    log.info("Reconnected.")
-            self._load_transaction_open = True
+                self._open_load_connection()
+            except:
+                log.exception("Reconnect failed.")
+                raise
+            else:
+                log.info("Reconnected.")
 
+    def _with_load(self, f, *args, **kw):
+        """Call a function with the load connection and cursor."""
+        if self._load_cursor is None:
+            self._open_load_connection()
+        try:
+            return f(self._load_conn, self._load_cursor, *args, **kw)
+        except self._adapter.connmanager.disconnected_exceptions, e:
+            log.warning("Reconnecting load_conn: %s", e)
+            self._drop_load_connection()
+            try:
+                self._open_load_connection()
+            except:
+                log.exception("Reconnect failed.")
+                raise
+            log.info("Reconnected.")
+            return f(self._load_conn, self._load_cursor, *args, **kw)
 
     def _open_store_connection(self):
         """Open the store connection to the database.  Return nothing."""
@@ -249,21 +265,41 @@
         """Restart the store connection, creating a new connection if needed"""
         if self._store_cursor is None:
             self._open_store_connection()
-        else:
+            return
+        try:
+            self._adapter.connmanager.restart_store(
+                self._store_conn, self._store_cursor)
+        except self._adapter.connmanager.disconnected_exceptions, e:
+            log.warning("Reconnecting store_conn: %s", e)
+            self._drop_store_connection()
             try:
-                self._adapter.connmanager.restart_store(
-                    self._store_conn, self._store_cursor)
-            except self._adapter.connmanager.disconnected_exceptions, e:
-                log.warning("Reconnecting store_conn: %s", e)
-                self._drop_store_connection()
-                try:
-                    self._open_store_connection()
-                except:
-                    log.exception("Reconnect failed.")
-                    raise
-                else:
-                    log.info("Reconnected.")
+                self._open_store_connection()
+            except:
+                log.exception("Reconnect failed.")
+                raise
+            else:
+                log.info("Reconnected.")
 
+    def _with_store(self, f, *args, **kw):
+        """Call a function with the store connection and cursor."""
+        if self._store_cursor is None:
+            self._open_store_connection()
+        try:
+            return f(self._store_conn, self._store_cursor, *args, **kw)
+        except self._adapter.connmanager.disconnected_exceptions, e:
+            if self._transaction is not None:
+                # If transaction commit is in progress, it's too late
+                # to reconnect.
+                raise
+            log.warning("Reconnecting store_conn: %s", e)
+            self._drop_store_connection()
+            try:
+                self._open_store_connection()
+            except:
+                log.exception("Reconnect failed.")
+                raise
+            log.info("Reconnected.")
+            return f(self._store_conn, self._store_cursor, *args, **kw)
 
     def zap_all(self):
         """Clear all objects and transactions out of the database.
@@ -871,12 +907,9 @@
             if self._preallocated_oids:
                 oid_int = self._preallocated_oids.pop()
             else:
-                cursor = self._store_cursor
-                if cursor is None:
-                    self._open_store_connection()
-                    cursor = self._store_cursor
-                preallocated = list(
-                    self._adapter.oidallocator.new_oids(cursor))
+                def f(conn, cursor):
+                    return list(self._adapter.oidallocator.new_oids(cursor))
+                preallocated = self._with_store(f)
                 preallocated.sort(reverse=True)
                 oid_int = preallocated.pop()
                 self._preallocated_oids = preallocated
@@ -1203,22 +1236,9 @@
                 ignore_tid = None
 
             # get a list of changed OIDs and the most recent tid
-            poll = self._adapter.poller.poll_invalidations
             prev = self._prev_polled_tid
-            try:
-                changes, new_polled_tid = poll(
-                    self._load_conn, self._load_cursor, prev, ignore_tid)
-            except self._adapter.connmanager.disconnected_exceptions, e:
-                log.warning("Reconnecting load_conn: %s", e)
-                self._drop_load_connection()
-                try:
-                    self._open_load_connection()
-                except:
-                    log.exception("Reconnect failed.")
-                    raise
-                log.info("Reconnected.")
-                changes, new_polled_tid = poll(
-                    self._load_conn, self._load_cursor, prev, ignore_tid)
+            changes, new_polled_tid = self._with_load(
+                self._adapter.poller.poll_invalidations, prev, ignore_tid)
 
             self._cache.after_poll(
                 self._load_cursor, prev, new_polled_tid, changes)

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2010-02-02 16:50:49 UTC (rev 108722)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2010-02-02 19:34:22 UTC (rev 108723)
@@ -325,7 +325,8 @@
             self.assert_(c2 is c1)
             r = c2.root()
             self.assertEqual(r['alpha'], 1)
-            r['beta'] = 2
+            r['beta'] = PersistentMapping()
+            c2.add(r['beta'])
             transaction.commit()
             c2.close()
         finally:



More information about the checkins mailing list