[Checkins] SVN: relstorage/trunk/ Added the ro-replica-conf option, which tells RelStorage to use a
Shane Hathaway
shane at hathawaymix.org
Sat Sep 24 02:01:34 EST 2011
Log message for revision 122921:
Added the ro-replica-conf option, which tells RelStorage to use a
read-only database replica for load connections. This allows
RelStorage to use read-only database replicas whenever possible.
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/README.txt
A relstorage/trunk/notes/quicktest.py
U relstorage/trunk/relstorage/adapters/connmanager.py
U relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/poller.py
U relstorage/trunk/relstorage/adapters/postgresql.py
U relstorage/trunk/relstorage/adapters/replica.py
U relstorage/trunk/relstorage/adapters/tests/test_connmanager.py
U relstorage/trunk/relstorage/adapters/tests/test_replica.py
U relstorage/trunk/relstorage/component.xml
U relstorage/trunk/relstorage/options.py
A relstorage/trunk/relstorage/tests/ro_replicas.conf
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/CHANGES.txt 2011-09-24 07:01:32 UTC (rev 122921)
@@ -9,6 +9,10 @@
additional transactions at a later date, or update a non-live copy of your
database, copying over missing transactions.
+- Added the ro-replica-conf option, which tells RelStorage to use a
+ read-only database replica for load connections. This allows
+ RelStorage to use read-only database replicas whenever possible.
+
1.5.0 (2011-06-30)
------------------
Modified: relstorage/trunk/README.txt
===================================================================
--- relstorage/trunk/README.txt 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/README.txt 2011-09-24 07:01:32 UTC (rev 122921)
@@ -456,6 +456,26 @@
adapter will drop existing SQL database connections and make
new connections when ZODB starts a new transaction.
+``ro-replica-conf``
+ Like the ``replica-conf`` option, but the referenced text file
+ provides a list of database replicas to use only for read-only
+ load connections. This allows RelStorage to load objects from
+ read-only database replicas, while using read-write replicas
+ for all other database interactions.
+
+ If this option is not provided, load connections will fall back
+ to the replica pool specified by ``replica-conf``. If
+ ``ro-replica-conf`` is provided but ``replica-conf`` is not,
+ RelStorage will use replicas for load connections but not for
+ other database interactions.
+
+ Note that if read-only replicas are asynchronous, the next
+ interaction after a write operation might not be up to date.
+ When that happens, RelStorage will log a "backward time travel"
+ warning and clear the ZODB cache to prevent consistency errors.
+ This will likely result in temporarily reduced performance as
+ the ZODB cache is repopulated.
+
``replica-timeout``
If this option has a nonzero value, when the adapter selects
a replica other than the primary replica, the adapter will
Added: relstorage/trunk/notes/quicktest.py
===================================================================
--- relstorage/trunk/notes/quicktest.py (rev 0)
+++ relstorage/trunk/notes/quicktest.py 2011-09-24 07:01:32 UTC (rev 122921)
@@ -0,0 +1,19 @@
+import logging
+import sys
+format = '%(asctime)s [%(name)s] %(levelname)s %(message)s'
+logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=format)
+import transaction
+from relstorage.storage import RelStorage
+from relstorage.options import Options
+from relstorage.adapters.mysql import MySQLAdapter
+from ZODB.DB import DB
+options = Options()
+adapter = MySQLAdapter(db='shane', options=options)
+storage = RelStorage(adapter, options=options)
+db = DB(storage)
+conn = db.open()
+root = conn.root()
+root['x'] = root.get('x', 0) + 1
+transaction.commit()
+conn.close()
+db.pack()
Modified: relstorage/trunk/relstorage/adapters/connmanager.py
===================================================================
--- relstorage/trunk/relstorage/adapters/connmanager.py 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/connmanager.py 2011-09-24 07:01:32 UTC (rev 122921)
@@ -40,10 +40,17 @@
def __init__(self, options):
# options is a relstorage.options.Options instance
if options.replica_conf:
- self.replica_selector = ReplicaSelector(options)
+ self.replica_selector = ReplicaSelector(
+ options.replica_conf, options.replica_timeout)
else:
self.replica_selector = None
+ if options.ro_replica_conf:
+ self.ro_replica_selector = ReplicaSelector(
+ options.ro_replica_conf, options.replica_timeout)
+ else:
+ self.ro_replica_selector = self.replica_selector
+
def set_on_store_opened(self, f):
"""Set the on_store_opened hook"""
self.on_store_opened = f
@@ -88,13 +95,17 @@
def restart_load(self, conn, cursor):
"""Reinitialize a connection for loading objects."""
- self.check_replica(conn, cursor)
+ self.check_replica(conn, cursor,
+ replica_selector=self.ro_replica_selector)
conn.rollback()
- def check_replica(self, conn, cursor):
+ def check_replica(self, conn, cursor, replica_selector=None):
"""Raise an exception if the connection belongs to an old replica"""
- if self.replica_selector is not None:
- current = self.replica_selector.current()
+ if replica_selector is None:
+ replica_selector = self.replica_selector
+
+ if replica_selector is not None:
+ current = replica_selector.current()
if conn.replica != current:
# Prompt the change to a new replica by raising an exception.
self.close(conn, cursor)
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2011-09-24 07:01:32 UTC (rev 122921)
@@ -187,40 +187,39 @@
close_exceptions = close_exceptions
def __init__(self, params, options):
- self._orig_params = params.copy()
- self._params = self._orig_params
- # _params_derived_from_replica contains the replica that
- # was used to set self._params.
- self._params_derived_from_replica = None
+ self._params = params.copy()
super(MySQLdbConnectionManager, self).__init__(options)
- def _set_params(self, replica):
+ def _alter_params(self, replica):
"""Alter the connection parameters to use the specified replica.
The replica parameter is a string specifying either host or host:port.
"""
- if replica != self._params_derived_from_replica:
- params = self._orig_params.copy()
- if ':' in replica:
- host, port = replica.split(':')
- params['host'] = host
- params['port'] = int(port)
- else:
- params['host'] = replica
- self._params_derived_from_replica = replica
- self._params = params
+ params = self._params.copy()
+ if ':' in replica:
+ host, port = replica.split(':')
+ params['host'] = host
+ params['port'] = int(port)
+ else:
+ params['host'] = replica
+ return params
- def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED"):
+ def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
+ replica_selector=None):
"""Open a database connection and return (conn, cursor)."""
- if self.replica_selector is not None:
- replica = self.replica_selector.current()
- self._set_params(replica)
+ if replica_selector is None:
+ replica_selector = self.replica_selector
+
+ if replica_selector is not None:
+ replica = replica_selector.current()
+ params = self._alter_params(replica)
else:
replica = None
+ params = self._params
while True:
try:
- conn = MySQLdb.connect(**self._params)
+ conn = MySQLdb.connect(**params)
cursor = conn.cursor()
cursor.arraysize = 64
if transaction_mode:
@@ -234,14 +233,13 @@
if replica is not None:
log.warning("Unable to connect to replica %s: %s",
replica, e)
- else:
- log.warning("Unable to connect: %s", e)
- if self.replica_selector is not None:
- replica = self.replica_selector.next()
+ replica = replica_selector.next()
if replica is not None:
- # try the new replica
- self._set_params(replica)
+ # try the next replica
+ params = self._alter_params(replica)
continue
+ else:
+ log.warning("Unable to connect: %s", e)
raise
def open_for_load(self):
@@ -249,7 +247,8 @@
Returns (conn, cursor).
"""
- return self.open(self.isolation_repeatable_read)
+ return self.open(self.isolation_repeatable_read,
+ replica_selector=self.ro_replica_selector)
def open_for_pre_pack(self):
"""Open a connection to be used for the pre-pack phase.
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2011-09-24 07:01:32 UTC (rev 122921)
@@ -283,16 +283,20 @@
super(CXOracleConnectionManager, self).__init__(options)
def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
- twophase=False):
+ twophase=False, replica_selector=None):
"""Open a database connection and return (conn, cursor)."""
- if self.replica_selector is not None:
- self._dsn = self.replica_selector.current()
+ if replica_selector is None:
+ replica_selector = self.replica_selector
+ if replica_selector is not None:
+ dsn = replica_selector.current()
+ else:
+ dsn = self._dsn
+
while True:
try:
kw = {'twophase': twophase, 'threaded': True}
- conn = cx_Oracle.connect(
- self._user, self._password, self._dsn, **kw)
+ conn = cx_Oracle.connect(self._user, self._password, dsn, **kw)
cursor = conn.cursor()
cursor.arraysize = 64
if transaction_mode:
@@ -300,12 +304,12 @@
return conn, cursor
except cx_Oracle.OperationalError, e:
- log.warning("Unable to connect to DSN %s: %s", self._dsn, e)
- if self.replica_selector is not None:
- replica = self.replica_selector.next()
+ log.warning("Unable to connect to DSN %s: %s", dsn, e)
+ if replica_selector is not None:
+ replica = replica_selector.next()
if replica is not None:
# try the new replica
- self._dsn = replica
+ dsn = replica
continue
raise
@@ -314,23 +318,28 @@
Returns (conn, cursor).
"""
- return self.open(self.isolation_read_only)
+ return self.open(self.isolation_read_only,
+ replica_selector=self.ro_replica_selector)
def restart_load(self, conn, cursor):
"""Reinitialize a connection for loading objects."""
- self.check_replica(conn, cursor)
+ self.check_replica(conn, cursor,
+ replica_selector=self.ro_replica_selector)
conn.rollback()
cursor.execute("SET TRANSACTION %s" % self.isolation_read_only)
- def check_replica(self, conn, cursor):
+ def check_replica(self, conn, cursor, replica_selector=None):
"""Raise an exception if the connection belongs to an old replica"""
- if self.replica_selector is not None:
- current = self.replica_selector.current()
+ if replica_selector is None:
+ replica_selector = self.replica_selector
+
+ if replica_selector is not None:
+ current = replica_selector.current()
if conn.dsn != current:
# Prompt the change to a new replica by raising an exception.
self.close(conn, cursor)
raise ReplicaClosedException(
- "Switching replica from %s to %s" % (conn.dsn, current))
+ "Switched replica from %s to %s" % (conn.dsn, current))
def _set_xid(self, conn, cursor):
"""Set up a distributed transaction"""
Modified: relstorage/trunk/relstorage/adapters/poller.py
===================================================================
--- relstorage/trunk/relstorage/adapters/poller.py 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/poller.py 2011-09-24 07:01:32 UTC (rev 122921)
@@ -106,7 +106,7 @@
else:
# We moved backward in time. This can happen after failover
# to an asynchronous slave that is not fully up to date. If
- # this was not caused by failover, it suggests that
+ # this was not caused by failover, this condition suggests that
# transaction IDs are not being created in order, which can
# lead to consistency violations.
log.warning(
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2011-09-24 07:01:32 UTC (rev 122921)
@@ -157,39 +157,39 @@
close_exceptions = close_exceptions
def __init__(self, dsn, options):
- self._orig_dsn = dsn
self._dsn = dsn
self.keep_history = options.keep_history
- # _dsn_derived_from_replica contains the replica that
- # was used to set self._dsn.
- self._dsn_derived_from_replica = None
super(Psycopg2ConnectionManager, self).__init__(options)
- def _set_dsn(self, replica):
+ def _alter_dsn(self, replica):
"""Alter the DSN to use the specified replica.
The replica parameter is a string specifying either host or host:port.
"""
- if replica != self._dsn_derived_from_replica:
- if ':' in replica:
- host, port = replica.split(':')
- self._dsn = self._orig_dsn + ' host=%s port=%s' % (host, port)
- else:
- self._dsn = self._orig_dsn + ' host=%s' % replica
- self._dsn_derived_from_replica = replica
+ if ':' in replica:
+ host, port = replica.split(':')
+ dsn = '%s host=%s port=%s' % (self._dsn, host, port)
+ else:
+ dsn = '%s host=%s' % (self._dsn, replica)
+ return dsn
def open(self,
- isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
+ isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED,
+ replica_selector=None):
"""Open a database connection and return (conn, cursor)."""
- if self.replica_selector is not None:
- replica = self.replica_selector.current()
- self._set_dsn(replica)
+ if replica_selector is None:
+ replica_selector = self.replica_selector
+
+ if replica_selector is not None:
+ replica = replica_selector.current()
+ dsn = self._alter_dsn(replica)
else:
replica = None
+ dsn = self._dsn
while True:
try:
- conn = Psycopg2Connection(self._dsn)
+ conn = Psycopg2Connection(dsn)
conn.set_isolation_level(isolation)
cursor = conn.cursor()
cursor.arraysize = 64
@@ -199,14 +199,13 @@
if replica is not None:
log.warning("Unable to connect to replica %s: %s",
replica, e)
- else:
- log.warning("Unable to connect: %s", e)
- if self.replica_selector is not None:
- replica = self.replica_selector.next()
+ replica = replica_selector.next()
if replica is not None:
# try the new replica
- self._set_dsn(replica)
+ dsn = self._alter_dsn(replica)
continue
+ else:
+ log.warning("Unable to connect: %s", e)
raise
def open_for_load(self):
@@ -214,7 +213,8 @@
Returns (conn, cursor).
"""
- conn, cursor = self.open(self.isolation_serializable)
+ conn, cursor = self.open(self.isolation_serializable,
+ replica_selector=self.ro_replica_selector)
if self.keep_history:
stmt = """
PREPARE get_latest_tid AS
Modified: relstorage/trunk/relstorage/adapters/replica.py
===================================================================
--- relstorage/trunk/relstorage/adapters/replica.py 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/replica.py 2011-09-24 07:01:32 UTC (rev 122921)
@@ -17,12 +17,13 @@
import os
import time
+
class ReplicaSelector(object):
implements(IReplicaSelector)
- def __init__(self, options):
- self.replica_conf = options.replica_conf
- self.alt_timeout = options.replica_timeout
+ def __init__(self, fn, replica_timeout):
+ self.replica_conf = fn
+ self.replica_timeout = replica_timeout
self._read_config()
self._select(0)
self._iterating = False
@@ -59,8 +60,8 @@
def _select(self, index):
self._current_replica = self._replicas[index]
self._current_index = index
- if index > 0 and self.alt_timeout:
- self._expiration = time.time() + self.alt_timeout
+ if index > 0 and self.replica_timeout:
+ self._expiration = time.time() + self.replica_timeout
else:
self._expiration = None
Modified: relstorage/trunk/relstorage/adapters/tests/test_connmanager.py
===================================================================
--- relstorage/trunk/relstorage/adapters/tests/test_connmanager.py 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/tests/test_connmanager.py 2011-09-24 07:01:32 UTC (rev 122921)
@@ -56,10 +56,34 @@
self.assertRaises(ReplicaClosedException,
cm.restart_store, conn, MockCursor())
+ def test_with_ro_replica_conf(self):
+ import os
+ import relstorage.tests
+ tests_dir = relstorage.tests.__file__
+ replica_conf = os.path.join(os.path.dirname(tests_dir),
+ 'replicas.conf')
+ ro_replica_conf = os.path.join(os.path.dirname(tests_dir),
+ 'ro_replicas.conf')
+ options = MockOptions(replica_conf, ro_replica_conf)
+ from relstorage.adapters.connmanager \
+ import AbstractConnectionManager
+ from relstorage.adapters.interfaces import ReplicaClosedException
+ cm = AbstractConnectionManager(options)
+
+ conn = MockConnection()
+ conn.replica = 'readonlyhost'
+ cm.restart_load(conn, MockCursor())
+ self.assertTrue(conn.rolled_back)
+ conn.replica = 'other'
+ self.assertRaises(ReplicaClosedException,
+ cm.restart_load, conn, MockCursor())
+
+
class MockOptions:
- def __init__(self, fn=None):
+ def __init__(self, fn=None, ro_fn=None):
self.replica_conf = fn
+ self.ro_replica_conf = ro_fn
self.replica_timeout = 600.0
class MockConnection:
Modified: relstorage/trunk/relstorage/adapters/tests/test_replica.py
===================================================================
--- relstorage/trunk/relstorage/adapters/tests/test_replica.py 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/adapters/tests/test_replica.py 2011-09-24 07:01:32 UTC (rev 122921)
@@ -24,7 +24,6 @@
"# Replicas\n\nexample.com:1234\nlocalhost:4321\n"
"\nlocalhost:9999\n")
os.close(fd)
- self.options = MockOptions(self.fn)
def tearDown(self):
import os
@@ -32,19 +31,19 @@
def test__read_config_normal(self):
from relstorage.adapters.replica import ReplicaSelector
- rs = ReplicaSelector(self.options)
+ rs = ReplicaSelector(self.fn, 600.0)
self.assertEqual(rs._replicas,
['example.com:1234', 'localhost:4321', 'localhost:9999'])
def test__read_config_empty(self):
from relstorage.adapters.replica import ReplicaSelector
open(self.fn, 'w').close() # truncate the replica list file
- self.assertRaises(IndexError, ReplicaSelector, self.options)
+ self.assertRaises(IndexError, ReplicaSelector, self.fn, 600.0)
def test__is_config_modified(self):
from relstorage.adapters.replica import ReplicaSelector
import time
- rs = ReplicaSelector(self.options)
+ rs = ReplicaSelector(self.fn, 600.0)
self.assertEqual(rs._is_config_modified(), False)
# change the file
rs._config_modified = 0
@@ -57,7 +56,7 @@
def test__select(self):
from relstorage.adapters.replica import ReplicaSelector
- rs = ReplicaSelector(self.options)
+ rs = ReplicaSelector(self.fn, 600.0)
rs._select(0)
self.assertEqual(rs._current_replica, 'example.com:1234')
self.assertEqual(rs._current_index, 0)
@@ -69,7 +68,7 @@
def test_current(self):
from relstorage.adapters.replica import ReplicaSelector
- rs = ReplicaSelector(self.options)
+ rs = ReplicaSelector(self.fn, 600.0)
self.assertEqual(rs.current(), 'example.com:1234')
# change the file and get the new current replica
f = open(self.fn, 'w')
@@ -87,7 +86,7 @@
def test_next_iteration(self):
from relstorage.adapters.replica import ReplicaSelector
- rs = ReplicaSelector(self.options)
+ rs = ReplicaSelector(self.fn, 600.0)
# test forward iteration
self.assertEqual(rs.current(), 'example.com:1234')
@@ -109,13 +108,13 @@
f = open(self.fn, 'w')
f.write('localhost\n')
f.close()
- rs = ReplicaSelector(self.options)
+ rs = ReplicaSelector(self.fn, 600.0)
self.assertEqual(rs.current(), 'localhost')
self.assertEqual(rs.next(), None)
def test_next_with_new_conf(self):
from relstorage.adapters.replica import ReplicaSelector
- rs = ReplicaSelector(self.options)
+ rs = ReplicaSelector(self.fn, 600.0)
self.assertEqual(rs.current(), 'example.com:1234')
self.assertEqual(rs.next(), 'localhost:4321')
# interrupt the iteration by changing the replica conf file
@@ -128,11 +127,6 @@
self.assertEqual(rs.next(), None)
-class MockOptions:
- def __init__(self, fn):
- self.replica_conf = fn
- self.replica_timeout = 600.0
-
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ReplicaSelectorTests))
Modified: relstorage/trunk/relstorage/component.xml
===================================================================
--- relstorage/trunk/relstorage/component.xml 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/component.xml 2011-09-24 07:01:32 UTC (rev 122921)
@@ -35,6 +35,9 @@
<key name="replica-conf" datatype="string" required="no">
<description>See the RelStorage README.txt file.</description>
</key>
+ <key name="ro-replica-conf" datatype="string" required="no">
+ <description>See the RelStorage README.txt file.</description>
+ </key>
<key name="replica-timeout" datatype="float" default="600.0">
<description>See the RelStorage README.txt file.</description>
</key>
Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py 2011-09-24 05:40:55 UTC (rev 122920)
+++ relstorage/trunk/relstorage/options.py 2011-09-24 07:01:32 UTC (rev 122921)
@@ -45,6 +45,7 @@
self.blob_chunk_size = 1 << 20
self.keep_history = True
self.replica_conf = None
+ self.ro_replica_conf = None
self.replica_timeout = 600.0
self.poll_interval = 0
self.pack_gc = True
Added: relstorage/trunk/relstorage/tests/ro_replicas.conf
===================================================================
--- relstorage/trunk/relstorage/tests/ro_replicas.conf (rev 0)
+++ relstorage/trunk/relstorage/tests/ro_replicas.conf 2011-09-24 07:01:32 UTC (rev 122921)
@@ -0,0 +1,2 @@
+# this is used by some of the tests
+readonlyhost
More information about the checkins
mailing list