[Checkins] SVN: relstorage/trunk/ Added the ro-replica-conf option, which tells RelStorage to use a

Shane Hathaway shane at hathawaymix.org
Sat Sep 24 02:01:34 EST 2011


Log message for revision 122921:
  Added the ro-replica-conf option, which tells RelStorage to use a
  read-only database replica for load connections. This allows
  RelStorage to use read-only database replicas whenever possible.
  
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/README.txt
  A   relstorage/trunk/notes/quicktest.py
  U   relstorage/trunk/relstorage/adapters/connmanager.py
  U   relstorage/trunk/relstorage/adapters/mysql.py
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/poller.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  U   relstorage/trunk/relstorage/adapters/replica.py
  U   relstorage/trunk/relstorage/adapters/tests/test_connmanager.py
  U   relstorage/trunk/relstorage/adapters/tests/test_replica.py
  U   relstorage/trunk/relstorage/component.xml
  U   relstorage/trunk/relstorage/options.py
  A   relstorage/trunk/relstorage/tests/ro_replicas.conf

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/CHANGES.txt	2011-09-24 07:01:32 UTC (rev 122921)
@@ -9,6 +9,10 @@
   additional transactions at a later date, or update a non-live copy of your
   database, copying over missing transactions.
 
+- Added the ro-replica-conf option, which tells RelStorage to use a
+  read-only database replica for load connections. This allows
+  RelStorage to use read-only database replicas whenever possible.
+
 1.5.0 (2011-06-30)
 ------------------
 

Modified: relstorage/trunk/README.txt
===================================================================
--- relstorage/trunk/README.txt	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/README.txt	2011-09-24 07:01:32 UTC (rev 122921)
@@ -456,6 +456,26 @@
         adapter will drop existing SQL database connections and make
         new connections when ZODB starts a new transaction.
 
+``ro-replica-conf``
+        Like the ``replica-conf`` option, but the referenced text file
+        provides a list of database replicas to use only for read-only
+        load connections. This allows RelStorage to load objects from
+        read-only database replicas, while using read-write replicas
+        for all other database interactions.
+
+        If this option is not provided, load connections will fall back
+        to the replica pool specified by ``replica-conf``. If
+        ``ro-replica-conf`` is provided but ``replica-conf`` is not,
+        RelStorage will use replicas for load connections but not for
+        other database interactions.
+
+        Note that if read-only replicas are asynchronous, the next
+        interaction after a write operation might not be up to date.
+        When that happens, RelStorage will log a "backward time travel"
+        warning and clear the ZODB cache to prevent consistency errors.
+        This will likely result in temporarily reduced performance as
+        the ZODB cache is repopulated.
+
 ``replica-timeout``
         If this option has a nonzero value, when the adapter selects
         a replica other than the primary replica, the adapter will

Added: relstorage/trunk/notes/quicktest.py
===================================================================
--- relstorage/trunk/notes/quicktest.py	                        (rev 0)
+++ relstorage/trunk/notes/quicktest.py	2011-09-24 07:01:32 UTC (rev 122921)
@@ -0,0 +1,19 @@
+import logging
+import sys
+format = '%(asctime)s [%(name)s] %(levelname)s %(message)s'
+logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=format)
+import transaction
+from relstorage.storage import RelStorage
+from relstorage.options import Options
+from relstorage.adapters.mysql import MySQLAdapter
+from ZODB.DB import DB
+options = Options()
+adapter = MySQLAdapter(db='shane', options=options)
+storage = RelStorage(adapter, options=options)
+db = DB(storage)
+conn = db.open()
+root = conn.root()
+root['x'] = root.get('x', 0) + 1
+transaction.commit()
+conn.close()
+db.pack()

Modified: relstorage/trunk/relstorage/adapters/connmanager.py
===================================================================
--- relstorage/trunk/relstorage/adapters/connmanager.py	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/connmanager.py	2011-09-24 07:01:32 UTC (rev 122921)
@@ -40,10 +40,17 @@
     def __init__(self, options):
         # options is a relstorage.options.Options instance
         if options.replica_conf:
-            self.replica_selector = ReplicaSelector(options)
+            self.replica_selector = ReplicaSelector(
+                options.replica_conf, options.replica_timeout)
         else:
             self.replica_selector = None
 
+        if options.ro_replica_conf:
+            self.ro_replica_selector = ReplicaSelector(
+                options.ro_replica_conf, options.replica_timeout)
+        else:
+            self.ro_replica_selector = self.replica_selector
+
     def set_on_store_opened(self, f):
         """Set the on_store_opened hook"""
         self.on_store_opened = f
@@ -88,13 +95,17 @@
 
     def restart_load(self, conn, cursor):
         """Reinitialize a connection for loading objects."""
-        self.check_replica(conn, cursor)
+        self.check_replica(conn, cursor,
+            replica_selector=self.ro_replica_selector)
         conn.rollback()
 
-    def check_replica(self, conn, cursor):
+    def check_replica(self, conn, cursor, replica_selector=None):
         """Raise an exception if the connection belongs to an old replica"""
-        if self.replica_selector is not None:
-            current = self.replica_selector.current()
+        if replica_selector is None:
+            replica_selector = self.replica_selector
+
+        if replica_selector is not None:
+            current = replica_selector.current()
             if conn.replica != current:
                 # Prompt the change to a new replica by raising an exception.
                 self.close(conn, cursor)

Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/mysql.py	2011-09-24 07:01:32 UTC (rev 122921)
@@ -187,40 +187,39 @@
     close_exceptions = close_exceptions
 
     def __init__(self, params, options):
-        self._orig_params = params.copy()
-        self._params = self._orig_params
-        # _params_derived_from_replica contains the replica that
-        # was used to set self._params.
-        self._params_derived_from_replica = None
+        self._params = params.copy()
         super(MySQLdbConnectionManager, self).__init__(options)
 
-    def _set_params(self, replica):
+    def _alter_params(self, replica):
         """Alter the connection parameters to use the specified replica.
 
         The replica parameter is a string specifying either host or host:port.
         """
-        if replica != self._params_derived_from_replica:
-            params = self._orig_params.copy()
-            if ':' in replica:
-                host, port = replica.split(':')
-                params['host'] = host
-                params['port'] = int(port)
-            else:
-                params['host'] = replica
-            self._params_derived_from_replica = replica
-            self._params = params
+        params = self._params.copy()
+        if ':' in replica:
+            host, port = replica.split(':')
+            params['host'] = host
+            params['port'] = int(port)
+        else:
+            params['host'] = replica
+        return params
 
-    def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED"):
+    def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
+            replica_selector=None):
         """Open a database connection and return (conn, cursor)."""
-        if self.replica_selector is not None:
-            replica = self.replica_selector.current()
-            self._set_params(replica)
+        if replica_selector is None:
+            replica_selector = self.replica_selector
+
+        if replica_selector is not None:
+            replica = replica_selector.current()
+            params = self._alter_params(replica)
         else:
             replica = None
+            params = self._params
 
         while True:
             try:
-                conn = MySQLdb.connect(**self._params)
+                conn = MySQLdb.connect(**params)
                 cursor = conn.cursor()
                 cursor.arraysize = 64
                 if transaction_mode:
@@ -234,14 +233,13 @@
                 if replica is not None:
                     log.warning("Unable to connect to replica %s: %s",
                         replica, e)
-                else:
-                    log.warning("Unable to connect: %s", e)
-                if self.replica_selector is not None:
-                    replica = self.replica_selector.next()
+                    replica = replica_selector.next()
                     if replica is not None:
-                        # try the new replica
-                        self._set_params(replica)
+                        # try the next replica
+                        params = self._alter_params(replica)
                         continue
+                else:
+                    log.warning("Unable to connect: %s", e)
                 raise
 
     def open_for_load(self):
@@ -249,7 +247,8 @@
 
         Returns (conn, cursor).
         """
-        return self.open(self.isolation_repeatable_read)
+        return self.open(self.isolation_repeatable_read,
+            replica_selector=self.ro_replica_selector)
 
     def open_for_pre_pack(self):
         """Open a connection to be used for the pre-pack phase.

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2011-09-24 07:01:32 UTC (rev 122921)
@@ -283,16 +283,20 @@
         super(CXOracleConnectionManager, self).__init__(options)
 
     def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
-            twophase=False):
+            twophase=False, replica_selector=None):
         """Open a database connection and return (conn, cursor)."""
-        if self.replica_selector is not None:
-            self._dsn = self.replica_selector.current()
+        if replica_selector is None:
+            replica_selector = self.replica_selector
 
+        if replica_selector is not None:
+            dsn = replica_selector.current()
+        else:
+            dsn = self._dsn
+
         while True:
             try:
                 kw = {'twophase': twophase, 'threaded': True}
-                conn = cx_Oracle.connect(
-                    self._user, self._password, self._dsn, **kw)
+                conn = cx_Oracle.connect(self._user, self._password, dsn, **kw)
                 cursor = conn.cursor()
                 cursor.arraysize = 64
                 if transaction_mode:
@@ -300,12 +304,12 @@
                 return conn, cursor
 
             except cx_Oracle.OperationalError, e:
-                log.warning("Unable to connect to DSN %s: %s", self._dsn, e)
-                if self.replica_selector is not None:
-                    replica = self.replica_selector.next()
+                log.warning("Unable to connect to DSN %s: %s", dsn, e)
+                if replica_selector is not None:
+                    replica = replica_selector.next()
                     if replica is not None:
                         # try the new replica
-                        self._dsn = replica
+                        dsn = replica
                         continue
                 raise
 
@@ -314,23 +318,28 @@
 
         Returns (conn, cursor).
         """
-        return self.open(self.isolation_read_only)
+        return self.open(self.isolation_read_only,
+            replica_selector=self.ro_replica_selector)
 
     def restart_load(self, conn, cursor):
         """Reinitialize a connection for loading objects."""
-        self.check_replica(conn, cursor)
+        self.check_replica(conn, cursor,
+            replica_selector=self.ro_replica_selector)
         conn.rollback()
         cursor.execute("SET TRANSACTION %s" % self.isolation_read_only)
 
-    def check_replica(self, conn, cursor):
+    def check_replica(self, conn, cursor, replica_selector=None):
         """Raise an exception if the connection belongs to an old replica"""
-        if self.replica_selector is not None:
-            current = self.replica_selector.current()
+        if replica_selector is None:
+            replica_selector = self.replica_selector
+
+        if replica_selector is not None:
+            current = replica_selector.current()
             if conn.dsn != current:
                 # Prompt the change to a new replica by raising an exception.
                 self.close(conn, cursor)
                 raise ReplicaClosedException(
-                    "Switching replica from %s to %s" % (conn.dsn, current))
+                    "Switched replica from %s to %s" % (conn.dsn, current))
 
     def _set_xid(self, conn, cursor):
         """Set up a distributed transaction"""

Modified: relstorage/trunk/relstorage/adapters/poller.py
===================================================================
--- relstorage/trunk/relstorage/adapters/poller.py	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/poller.py	2011-09-24 07:01:32 UTC (rev 122921)
@@ -106,7 +106,7 @@
         else:
             # We moved backward in time. This can happen after failover
             # to an asynchronous slave that is not fully up to date. If
-            # this was not caused by failover, it suggests that
+            # this was not caused by failover, this condition suggests that
             # transaction IDs are not being created in order, which can
             # lead to consistency violations.
             log.warning(

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2011-09-24 07:01:32 UTC (rev 122921)
@@ -157,39 +157,39 @@
     close_exceptions = close_exceptions
 
     def __init__(self, dsn, options):
-        self._orig_dsn = dsn
         self._dsn = dsn
         self.keep_history = options.keep_history
-        # _dsn_derived_from_replica contains the replica that
-        # was used to set self._dsn.
-        self._dsn_derived_from_replica = None
         super(Psycopg2ConnectionManager, self).__init__(options)
 
-    def _set_dsn(self, replica):
+    def _alter_dsn(self, replica):
         """Alter the DSN to use the specified replica.
 
         The replica parameter is a string specifying either host or host:port.
         """
-        if replica != self._dsn_derived_from_replica:
-            if ':' in replica:
-                host, port = replica.split(':')
-                self._dsn = self._orig_dsn + ' host=%s port=%s' % (host, port)
-            else:
-                self._dsn = self._orig_dsn + ' host=%s' % replica
-            self._dsn_derived_from_replica = replica
+        if ':' in replica:
+            host, port = replica.split(':')
+            dsn = '%s host=%s port=%s' % (self._dsn, host, port)
+        else:
+            dsn = '%s host=%s' % (self._dsn, replica)
+        return dsn
 
     def open(self,
-            isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
+            isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED,
+            replica_selector=None):
         """Open a database connection and return (conn, cursor)."""
-        if self.replica_selector is not None:
-            replica = self.replica_selector.current()
-            self._set_dsn(replica)
+        if replica_selector is None:
+            replica_selector = self.replica_selector
+
+        if replica_selector is not None:
+            replica = replica_selector.current()
+            dsn = self._alter_dsn(replica)
         else:
             replica = None
+            dsn = self._dsn
 
         while True:
             try:
-                conn = Psycopg2Connection(self._dsn)
+                conn = Psycopg2Connection(dsn)
                 conn.set_isolation_level(isolation)
                 cursor = conn.cursor()
                 cursor.arraysize = 64
@@ -199,14 +199,13 @@
                 if replica is not None:
                     log.warning("Unable to connect to replica %s: %s",
                         replica, e)
-                else:
-                    log.warning("Unable to connect: %s", e)
-                if self.replica_selector is not None:
-                    replica = self.replica_selector.next()
+                    replica = replica_selector.next()
                     if replica is not None:
                         # try the new replica
-                        self._set_dsn(replica)
+                        dsn = self._alter_dsn(replica)
                         continue
+                else:
+                    log.warning("Unable to connect: %s", e)
                 raise
 
     def open_for_load(self):
@@ -214,7 +213,8 @@
 
         Returns (conn, cursor).
         """
-        conn, cursor = self.open(self.isolation_serializable)
+        conn, cursor = self.open(self.isolation_serializable,
+            replica_selector=self.ro_replica_selector)
         if self.keep_history:
             stmt = """
             PREPARE get_latest_tid AS

Modified: relstorage/trunk/relstorage/adapters/replica.py
===================================================================
--- relstorage/trunk/relstorage/adapters/replica.py	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/replica.py	2011-09-24 07:01:32 UTC (rev 122921)
@@ -17,12 +17,13 @@
 import os
 import time
 
+
 class ReplicaSelector(object):
     implements(IReplicaSelector)
 
-    def __init__(self, options):
-        self.replica_conf = options.replica_conf
-        self.alt_timeout = options.replica_timeout
+    def __init__(self, fn, replica_timeout):
+        self.replica_conf = fn
+        self.replica_timeout = replica_timeout
         self._read_config()
         self._select(0)
         self._iterating = False
@@ -59,8 +60,8 @@
     def _select(self, index):
         self._current_replica = self._replicas[index]
         self._current_index = index
-        if index > 0 and self.alt_timeout:
-            self._expiration = time.time() + self.alt_timeout
+        if index > 0 and self.replica_timeout:
+            self._expiration = time.time() + self.replica_timeout
         else:
             self._expiration = None
 

Modified: relstorage/trunk/relstorage/adapters/tests/test_connmanager.py
===================================================================
--- relstorage/trunk/relstorage/adapters/tests/test_connmanager.py	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/tests/test_connmanager.py	2011-09-24 07:01:32 UTC (rev 122921)
@@ -56,10 +56,34 @@
         self.assertRaises(ReplicaClosedException,
             cm.restart_store, conn, MockCursor())
 
+    def test_with_ro_replica_conf(self):
+        import os
+        import relstorage.tests
+        tests_dir = relstorage.tests.__file__
+        replica_conf = os.path.join(os.path.dirname(tests_dir),
+            'replicas.conf')
+        ro_replica_conf = os.path.join(os.path.dirname(tests_dir),
+            'ro_replicas.conf')
+        options = MockOptions(replica_conf, ro_replica_conf)
 
+        from relstorage.adapters.connmanager \
+            import AbstractConnectionManager
+        from relstorage.adapters.interfaces import ReplicaClosedException
+        cm = AbstractConnectionManager(options)
+
+        conn = MockConnection()
+        conn.replica = 'readonlyhost'
+        cm.restart_load(conn, MockCursor())
+        self.assertTrue(conn.rolled_back)
+        conn.replica = 'other'
+        self.assertRaises(ReplicaClosedException,
+            cm.restart_load, conn, MockCursor())
+
+
 class MockOptions:
-    def __init__(self, fn=None):
+    def __init__(self, fn=None, ro_fn=None):
         self.replica_conf = fn
+        self.ro_replica_conf = ro_fn
         self.replica_timeout = 600.0
 
 class MockConnection:

Modified: relstorage/trunk/relstorage/adapters/tests/test_replica.py
===================================================================
--- relstorage/trunk/relstorage/adapters/tests/test_replica.py	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/tests/test_replica.py	2011-09-24 07:01:32 UTC (rev 122921)
@@ -24,7 +24,6 @@
             "# Replicas\n\nexample.com:1234\nlocalhost:4321\n"
             "\nlocalhost:9999\n")
         os.close(fd)
-        self.options = MockOptions(self.fn)
 
     def tearDown(self):
         import os
@@ -32,19 +31,19 @@
 
     def test__read_config_normal(self):
         from relstorage.adapters.replica import ReplicaSelector
-        rs = ReplicaSelector(self.options)
+        rs = ReplicaSelector(self.fn, 600.0)
         self.assertEqual(rs._replicas,
             ['example.com:1234', 'localhost:4321', 'localhost:9999'])
 
     def test__read_config_empty(self):
         from relstorage.adapters.replica import ReplicaSelector
         open(self.fn, 'w').close()  # truncate the replica list file
-        self.assertRaises(IndexError, ReplicaSelector, self.options)
+        self.assertRaises(IndexError, ReplicaSelector, self.fn, 600.0)
 
     def test__is_config_modified(self):
         from relstorage.adapters.replica import ReplicaSelector
         import time
-        rs = ReplicaSelector(self.options)
+        rs = ReplicaSelector(self.fn, 600.0)
         self.assertEqual(rs._is_config_modified(), False)
         # change the file
         rs._config_modified = 0
@@ -57,7 +56,7 @@
 
     def test__select(self):
         from relstorage.adapters.replica import ReplicaSelector
-        rs = ReplicaSelector(self.options)
+        rs = ReplicaSelector(self.fn, 600.0)
         rs._select(0)
         self.assertEqual(rs._current_replica, 'example.com:1234')
         self.assertEqual(rs._current_index, 0)
@@ -69,7 +68,7 @@
 
     def test_current(self):
         from relstorage.adapters.replica import ReplicaSelector
-        rs = ReplicaSelector(self.options)
+        rs = ReplicaSelector(self.fn, 600.0)
         self.assertEqual(rs.current(), 'example.com:1234')
         # change the file and get the new current replica
         f = open(self.fn, 'w')
@@ -87,7 +86,7 @@
 
     def test_next_iteration(self):
         from relstorage.adapters.replica import ReplicaSelector
-        rs = ReplicaSelector(self.options)
+        rs = ReplicaSelector(self.fn, 600.0)
 
         # test forward iteration
         self.assertEqual(rs.current(), 'example.com:1234')
@@ -109,13 +108,13 @@
         f = open(self.fn, 'w')
         f.write('localhost\n')
         f.close()
-        rs = ReplicaSelector(self.options)
+        rs = ReplicaSelector(self.fn, 600.0)
         self.assertEqual(rs.current(), 'localhost')
         self.assertEqual(rs.next(), None)
 
     def test_next_with_new_conf(self):
         from relstorage.adapters.replica import ReplicaSelector
-        rs = ReplicaSelector(self.options)
+        rs = ReplicaSelector(self.fn, 600.0)
         self.assertEqual(rs.current(), 'example.com:1234')
         self.assertEqual(rs.next(), 'localhost:4321')
         # interrupt the iteration by changing the replica conf file
@@ -128,11 +127,6 @@
         self.assertEqual(rs.next(), None)
 
 
-class MockOptions:
-    def __init__(self, fn):
-        self.replica_conf = fn
-        self.replica_timeout = 600.0
-
 def test_suite():
     suite = unittest.TestSuite()
     suite.addTest(unittest.makeSuite(ReplicaSelectorTests))

Modified: relstorage/trunk/relstorage/component.xml
===================================================================
--- relstorage/trunk/relstorage/component.xml	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/component.xml	2011-09-24 07:01:32 UTC (rev 122921)
@@ -35,6 +35,9 @@
     <key name="replica-conf" datatype="string" required="no">
       <description>See the RelStorage README.txt file.</description>
     </key>
+    <key name="ro-replica-conf" datatype="string" required="no">
+      <description>See the RelStorage README.txt file.</description>
+    </key>
     <key name="replica-timeout" datatype="float" default="600.0">
       <description>See the RelStorage README.txt file.</description>
     </key>

Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py	2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/options.py	2011-09-24 07:01:32 UTC (rev 122921)
@@ -45,6 +45,7 @@
         self.blob_chunk_size = 1 << 20
         self.keep_history = True
         self.replica_conf = None
+        self.ro_replica_conf = None
         self.replica_timeout = 600.0
         self.poll_interval = 0
         self.pack_gc = True

Added: relstorage/trunk/relstorage/tests/ro_replicas.conf
===================================================================
--- relstorage/trunk/relstorage/tests/ro_replicas.conf	                        (rev 0)
+++ relstorage/trunk/relstorage/tests/ro_replicas.conf	2011-09-24 07:01:32 UTC (rev 122921)
@@ -0,0 +1,2 @@
+# this is used by some of the tests
+readonlyhost



More information about the checkins mailing list