[Checkins] SVN: relstorage/trunk/ Better behavior for async replication. Changes:
Shane Hathaway
shane at hathawaymix.org
Fri Sep 30 04:56:17 EST 2011
Log message for revision 123000:
Better behavior for async replication. Changes:
- When the database connection is stale (such as when RelStorage switches
to an asynchronous replica that is not yet up to date), RelStorage will
now raise ReadConflictError by default. Ideally, the application will
react to the error by transparently retrying the transaction, while
the database gets up to date. A subsequent transaction will no longer
be stale.
- Added the revert-when-stale option, which can disable the new
behavior for stale database connections. This option is intended
for highly available, read-only ZODB clients. It would
confuse users of read-write ZODB clients.
This is a complex change, so it is time to start the 1.6 series.
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/README.txt
U relstorage/trunk/buildout.cfg
U relstorage/trunk/relstorage/adapters/interfaces.py
U relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/poller.py
U relstorage/trunk/relstorage/adapters/postgresql.py
U relstorage/trunk/relstorage/cache.py
U relstorage/trunk/relstorage/component.xml
U relstorage/trunk/relstorage/options.py
U relstorage/trunk/relstorage/storage.py
U relstorage/trunk/relstorage/tests/hftestbase.py
U relstorage/trunk/relstorage/tests/hptestbase.py
U relstorage/trunk/relstorage/tests/reltestbase.py
U relstorage/trunk/relstorage/tests/testmysql.py
U relstorage/trunk/relstorage/tests/testoracle.py
U relstorage/trunk/relstorage/tests/testpostgresql.py
A relstorage/trunk/repltest/master/notes
A relstorage/trunk/repltest/slave/notes
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/CHANGES.txt 2011-09-30 09:56:16 UTC (rev 123000)
@@ -13,6 +13,18 @@
read-only database replica for load connections. This allows
RelStorage to use read-only database replicas whenever possible.
+- When the database connection is stale (such as when RelStorage switches
+ to an asynchronous replica that is not yet up to date), RelStorage will
+ now raise ReadConflictError by default. Ideally, the application will
+ react to the error by transparently retrying the transaction, while
+ the database gets up to date. A subsequent transaction will no longer
+ be stale.
+
+- Added the revert-when-stale option, which can disable the new
+ behavior for stale database connections. This option is intended
+ for highly available, read-only ZODB clients. It would
+ confuse users of read-write ZODB clients.
+
TODO: provide a default random memcache prefix that is consistent per database.
1.5.0 (2011-06-30)
@@ -31,8 +43,8 @@
- Oracle, PostgreSQL: Switch to storing ZODB blob in chunks up to 4GB
(the maximum supported by cx_Oracle) or 2GB (PostgreSQL maximum blob size)
to maximize blob reading and writing performance.
-
- The PostgreSQL blob_chunk schema changed to support this, see
+
+ The PostgreSQL blob_chunk schema changed to support this, see
notes/migrate-to-1.5.txt to update existing databases.
- zodbconvert: When copying a database containing blobs, ensure the source
Modified: relstorage/trunk/README.txt
===================================================================
--- relstorage/trunk/README.txt 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/README.txt 2011-09-30 09:56:16 UTC (rev 123000)
@@ -482,6 +482,31 @@
try to revert to the primary replica after the specified
timeout (in seconds). The default is 600, meaning 10 minutes.
+``revert-when-stale``
+ Specifies what to do when a database connection is stale.
+ This is especially applicable to asynchronously replicated
+ databases: RelStorage could switch to a replica that is not
+ yet up to date.
+
+ When ``revert-when-stale`` is ``false`` (the default) and the
+ database connection is stale, RelStorage will raise a
+ ReadConflictError if the application tries to read or write
+ anything. The application should react to the
+ ReadConflictError by retrying the transaction after a delay
+ (possibly multiple times.) Once the database catches
+ up, a subsequent transaction will see the update and the
+ ReadConflictError will not occur again.
+
+ When ``revert-when-stale`` is ``true`` and the database connection
+ is stale, RelStorage will log a warning, clear the affected
+ ZODB connection cache (to prevent consistency errors), and let
+ the application continue with database state from
+ an earlier transaction. This behavior is intended to be useful
+ for highly available, read-only ZODB clients. Enabling this
+ option on ZODB clients that read and write the database is
+ likely to cause confusion for users whose changes
+ seem to be temporarily reverted.
+
``poll-interval``
Defer polling the database for the specified maximum time interval,
in seconds. Set to 0 (the default) to always poll. Fractional
@@ -522,7 +547,7 @@
of what to pack, but no data is actually removed. After a pre-pack,
the pack_object, pack_state, and pack_state_tid tables are filled
with the list of object states and objects that would have been
- removed. If pack-gc is true, the object_ref table will also be fully
+ removed. If pack-gc is true, the object_ref table will also be fully
populated. The object_ref table can be queried to discover references
between stored objects.
Modified: relstorage/trunk/buildout.cfg
===================================================================
--- relstorage/trunk/buildout.cfg 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/buildout.cfg 2011-09-30 09:56:16 UTC (rev 123000)
@@ -1,6 +1,6 @@
[buildout]
develop = .
-base-parts = test python coverage-test coverage-report
+base-parts = test python omelette coverage-test coverage-report
parts = ${buildout:base-parts}
eggs = relstorage
psycopg2
@@ -17,6 +17,11 @@
eggs = ${buildout:eggs}
interpreter = py
+[omelette]
+recipe = collective.recipe.omelette
+eggs = ${buildout:eggs}
+ignores = setuptools
+
[coverage-test]
recipe = zc.recipe.testrunner
eggs = ${buildout:eggs}
@@ -29,3 +34,4 @@
z3c.coverage
scripts = coveragereport
arguments = ('coverage', 'coverage/report')
+
Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/adapters/interfaces.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -363,6 +363,10 @@
a list of (oid, tid) that have changed, or None to indicate
that the changes are too complex to list. new_polled_tid is
never None.
+
+ This method may raise ReadConflictError if the database has
+ reverted to an earlier transaction, which can happen
+ in an asynchronously replicated database.
"""
def list_changes(cursor, after_tid, last_tid):
@@ -466,4 +470,3 @@
class ReplicaClosedException(Exception):
"""The connection to the replica has been closed"""
-
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -129,7 +129,8 @@
poll_query=poll_query,
keep_history=self.keep_history,
runner=self.runner,
- )
+ revert_when_stale=options.revert_when_stale,
+ )
if self.keep_history:
self.packundo = MySQLHistoryPreservingPackUndo(
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -124,7 +124,8 @@
poll_query=poll_query,
keep_history=self.keep_history,
runner=self.runner,
- )
+ revert_when_stale=options.revert_when_stale,
+ )
if self.keep_history:
self.packundo = OracleHistoryPreservingPackUndo(
Modified: relstorage/trunk/relstorage/adapters/poller.py
===================================================================
--- relstorage/trunk/relstorage/adapters/poller.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/adapters/poller.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -12,19 +12,23 @@
#
##############################################################################
+from ZODB.POSException import ReadConflictError
from relstorage.adapters.interfaces import IPoller
from zope.interface import implements
import logging
+
log = logging.getLogger(__name__)
+
class Poller:
"""Database change notification poller"""
implements(IPoller)
- def __init__(self, poll_query, keep_history, runner):
+ def __init__(self, poll_query, keep_history, runner, revert_when_stale):
self.poll_query = poll_query
self.keep_history = keep_history
self.runner = runner
+ self.revert_when_stale = revert_when_stale
def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
"""Polls for new transactions.
@@ -59,28 +63,30 @@
# No transactions have been committed since prev_polled_tid.
return (), new_polled_tid
- if self.keep_history:
- # If the previously polled transaction no longer exists,
- # the cache is too old and needs to be cleared.
- # XXX Do we actually need to detect this condition? I think
- # if we delete this block of code, all the unreachable
- # objects will be garbage collected anyway. So, as a test,
- # there is no equivalent of this block of code for
- # history-free storage. If something goes wrong, then we'll
- # know there's some other edge condition we have to account
- # for.
- stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
- cursor.execute(intern(stmt % self.runner.script_vars),
- {'tid': prev_polled_tid})
- rows = cursor.fetchall()
- if not rows:
- # Transaction not found; perhaps it has been packed.
- # The connection cache needs to be cleared.
- return None, new_polled_tid
+ elif new_polled_tid > prev_polled_tid:
+ # New transaction(s) have been added.
- # Get the list of changed OIDs and return it.
- if new_polled_tid > prev_polled_tid:
if self.keep_history:
+ # If the previously polled transaction no longer exists,
+ # the cache is too old and needs to be cleared.
+ # XXX Do we actually need to detect this condition? I think
+ # if we delete this block of code, all the unreachable
+ # objects will be garbage collected anyway. So, as a test,
+ # there is no equivalent of this block of code for
+ # history-free storage. If something goes wrong, then we'll
+ # know there's some other edge condition we have to account
+ # for.
+ stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
+ cursor.execute(intern(stmt % self.runner.script_vars),
+ {'tid': prev_polled_tid})
+ rows = cursor.fetchall()
+ if not rows:
+ # Transaction not found; perhaps it has been packed.
+ # The connection cache should be cleared.
+ return None, new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ if self.keep_history:
stmt = """
SELECT zoid, tid
FROM current_object
@@ -104,21 +110,28 @@
return changes, new_polled_tid
else:
- # We moved backward in time. This can happen after failover
- # to an asynchronous slave that is not fully up to date. If
- # this was not caused by failover, this condition suggests that
- # transaction IDs are not being created in order, which can
- # lead to consistency violations.
- log.warning(
- "Detected backward time travel (old tid %d, new tid %d). "
- "This is acceptable if it was caused by failover to a "
- "read-only asynchronous slave, but otherwise it may "
- "indicate a problem.",
- prev_polled_tid, new_polled_tid)
- # Although we could handle this situation by looking at the
- # whole cPickleCache and invalidating only certain objects,
- # invalidating the whole cache is simpler.
- return None, new_polled_tid
+ # The database connection is stale. This can happen after
+ # reading an asynchronous slave that is not fully up to date.
+ # (It may also suggest that transaction IDs are not being created
+ # in order, which would be a serious bug leading to consistency
+ # violations.)
+ if self.revert_when_stale:
+ # This client prefers to revert to the old state.
+ log.warning(
+ "Reverting to stale transaction ID %d and clearing cache. "
+ "(prev_polled_tid=%d)",
+ new_polled_tid, prev_polled_tid)
+ # We have to invalidate the whole cPickleCache, otherwise
+ # the cache would be inconsistent with the reverted state.
+ return None, new_polled_tid
+ else:
+ # This client never wants to revert to stale data, so
+ # raise ReadConflictError to trigger a retry.
+ # We're probably just waiting for async replication
+ # to catch up, so retrying could do the trick.
+ raise ReadConflictError(
+ "The database connection is stale: new_polled_tid=%d, "
+ "prev_polled_tid=%d." % (new_polled_tid, prev_polled_tid))
def list_changes(self, cursor, after_tid, last_tid):
"""Return the (oid, tid) values changed in a range of transactions.
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -94,7 +94,8 @@
poll_query="EXECUTE get_latest_tid",
keep_history=self.keep_history,
runner=self.runner,
- )
+ revert_when_stale=options.revert_when_stale,
+ )
if self.keep_history:
self.packundo = HistoryPreservingPackUndo(
Modified: relstorage/trunk/relstorage/cache.py
===================================================================
--- relstorage/trunk/relstorage/cache.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/cache.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -467,13 +467,17 @@
new_checkpoints = (new_tid_int, new_tid_int)
allow_shift = False
+ # We want to keep the current checkpoints for speed, but we
+ # have to replace them (to avoid consistency violations)
+ # if certain conditions happen (like emptying the ZODB cache).
if (new_checkpoints == self.checkpoints
and changes is not None
and prev_tid_int
and prev_tid_int <= self.current_tid
and new_tid_int >= self.current_tid
):
- # Keep the checkpoints and update self.delta_after0.
+ # All the conditions for keeping the checkpoints were met,
+ # so just update self.delta_after0 and self.current_tid.
m = self.delta_after0
m_get = m.get
for oid_int, tid_int in changes:
@@ -482,6 +486,7 @@
m[oid_int] = tid_int
self.current_tid = new_tid_int
else:
+ # We have to replace the checkpoints.
cp0, cp1 = new_checkpoints
log.debug("Using new checkpoints: %d %d", cp0, cp1)
# Use the checkpoints specified by the cache.
@@ -491,20 +496,20 @@
if cp1 < new_tid_int:
# poller.list_changes provides an iterator of
# (oid, tid) where tid > after_tid and tid <= last_tid.
- changes = self.adapter.poller.list_changes(
+ change_list = self.adapter.poller.list_changes(
cursor, cp1, new_tid_int)
# Make a dictionary that contains, for each oid, the most
# recent tid listed in changes.
- changes_dict = {}
- if not isinstance(changes, list):
- changes = list(changes)
- changes.sort()
- for oid_int, tid_int in changes:
- changes_dict[oid_int] = tid_int
+ change_dict = {}
+ if not isinstance(change_list, list):
+ change_list = list(change_list)
+ change_list.sort()
+ for oid_int, tid_int in change_list:
+ change_dict[oid_int] = tid_int
# Put the changes in new_delta_after*.
- for oid_int, tid_int in changes_dict.iteritems():
+ for oid_int, tid_int in change_dict.iteritems():
if tid_int > cp0:
new_delta_after0[oid_int] = tid_int
elif tid_int > cp1:
Modified: relstorage/trunk/relstorage/component.xml
===================================================================
--- relstorage/trunk/relstorage/component.xml 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/component.xml 2011-09-30 09:56:16 UTC (rev 123000)
@@ -41,6 +41,9 @@
<key name="replica-timeout" datatype="float" default="600.0">
<description>See the RelStorage README.txt file.</description>
</key>
+ <key name="revert_when_stale" datatype="boolean" default="false">
+ <description>See the RelStorage README.txt file.</description>
+ </key>
<key name="poll-interval" datatype="float" required="no">
<description>See the RelStorage README.txt file.</description>
</key>
Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/options.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -47,6 +47,7 @@
self.replica_conf = None
self.ro_replica_conf = None
self.replica_timeout = 600.0
+ self.revert_when_stale = False
self.poll_interval = 0
self.pack_gc = True
self.pack_prepack_only = False
Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/storage.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -16,21 +16,22 @@
Stores pickles in the database.
"""
-from persistent.TimeStamp import TimeStamp
-from relstorage.blobhelper import BlobHelper
-from relstorage.blobhelper import is_blob_record
-from relstorage.cache import StorageCache
-from relstorage.options import Options
+from ZODB import ConflictResolution
+from ZODB import POSException
from ZODB.BaseStorage import DataRecord
from ZODB.BaseStorage import TransactionRecord
-from ZODB import ConflictResolution
-from ZODB import POSException
from ZODB.FileStorage import FileIterator
from ZODB.POSException import POSKeyError
from ZODB.UndoLogCompatible import UndoLogCompatible
from ZODB.utils import p64
from ZODB.utils import u64
+from persistent.TimeStamp import TimeStamp
+from relstorage.blobhelper import BlobHelper
+from relstorage.blobhelper import is_blob_record
+from relstorage.cache import StorageCache
+from relstorage.options import Options
from zope.interface import implements
+import ZODB.interfaces
import base64
import cPickle
import logging
@@ -39,7 +40,6 @@
import threading
import time
import weakref
-import ZODB.interfaces
try:
from ZODB.interfaces import StorageStopIteration
@@ -143,6 +143,10 @@
# calling the database.
_batcher_row_limit = 100
+ # _stale_error is None most of the time. It's a ReadConflictError
+ # when the database connection is stale (due to async replication).
+ _stale_error = None
+
def __init__(self, adapter, name=None, create=None,
options=None, cache=None, blobhelper=None, **kwoptions):
self._adapter = adapter
@@ -328,7 +332,7 @@
self._cache.clear()
def release(self):
- """Release back end database sessions used by this storage instance.
+ """Release database sessions used by this storage instance.
"""
self._lock_acquire()
try:
@@ -446,6 +450,9 @@
logfunc('; '.join(msg))
def load(self, oid, version=''):
+ if self._stale_error is not None:
+ raise self._stale_error
+
oid_int = u64(oid)
cache = self._cache
@@ -471,7 +478,10 @@
raise POSKeyError(oid)
def getTid(self, oid):
- state, serial = self.load(oid, '')
+ if self._stale_error is not None:
+ raise self._stale_error
+
+ _state, serial = self.load(oid, '')
return serial
getSerial = getTid # ZODB 3.7
@@ -510,6 +520,9 @@
def loadBefore(self, oid, tid):
"""Return the most recent revision of oid before tid committed."""
+ if self._stale_error is not None:
+ raise self._stale_error
+
oid_int = u64(oid)
self._lock_acquire()
@@ -543,6 +556,8 @@
self._lock_release()
def store(self, oid, serial, data, version, transaction):
+ if self._stale_error is not None:
+ raise self._stale_error
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
@@ -580,6 +595,8 @@
# Like store(), but used for importing transactions. See the
# comments in FileStorage.restore(). The prev_txn optimization
# is not used.
+ if self._stale_error is not None:
+ raise self._stale_error
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
@@ -606,6 +623,8 @@
self._lock_release()
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
+ if self._stale_error is not None:
+ raise self._stale_error
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
@@ -626,6 +645,8 @@
self._txn_check_serials[oid] = serial
def tpc_begin(self, transaction, tid=None, status=' '):
+ if self._stale_error is not None:
+ raise self._stale_error
if self._is_read_only:
raise POSException.ReadOnlyError()
self._lock_acquire()
@@ -916,6 +937,8 @@
self._lock_release()
def new_oid(self):
+ if self._stale_error is not None:
+ raise self._stale_error
if self._is_read_only:
raise POSException.ReadOnlyError()
self._lock_acquire()
@@ -950,6 +973,8 @@
return self._adapter.keep_history
def undoLog(self, first=0, last=-20, filter=None):
+ if self._stale_error is not None:
+ raise self._stale_error
if last < 0:
last = first - last
@@ -980,6 +1005,8 @@
adapter.connmanager.close(conn, cursor)
def history(self, oid, version=None, size=1, filter=None):
+ if self._stale_error is not None:
+ raise self._stale_error
self._lock_acquire()
try:
cursor = self._load_cursor
@@ -1020,6 +1047,8 @@
the transaction.
"""
+ if self._stale_error is not None:
+ raise self._stale_error
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
@@ -1199,9 +1228,19 @@
prev = self._prev_polled_tid
# get a list of changed OIDs and the most recent tid
- changes, new_polled_tid = self._restart_load_and_call(
- self._adapter.poller.poll_invalidations, prev, ignore_tid)
+ try:
+ changes, new_polled_tid = self._restart_load_and_call(
+ self._adapter.poller.poll_invalidations, prev, ignore_tid)
+ except POSException.ReadConflictError, e:
+ # The database connection is stale, but postpone this
+ # error until the application tries to read or write something.
+ self._stale_error = e
+ # Always poll (override the poll_interval option).
+ self._poll_at = 0
+ return None, prev
+ self._stale_error = None
+
# Inform the cache of the changes.
self._cache.after_poll(
self._load_cursor, prev, new_polled_tid, changes)
@@ -1329,7 +1368,7 @@
elif isinstance(other, FileIterator):
# Create copy and ask for that for it's length so we do not
# exhaust the original iterator
- copy = FileIterator(other._file_name, other._start, other._stop,
+ copy = FileIterator(other._file_name, other._start, other._stop,
other._pos)
num_txns = len(list(copy))
else:
Modified: relstorage/trunk/relstorage/tests/hftestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/hftestbase.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/hftestbase.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -257,8 +257,7 @@
keep_history = False
def setUp(self):
- self.open(create=1)
- self._storage.zap_all()
+ self._storage = self.make_storage()
self._dst = FileStorage("Dest.fs", create=True)
def tearDown(self):
@@ -279,8 +278,6 @@
keep_history = False
def setUp(self):
- self.open(create=1)
- self._storage.zap_all()
self._dst = self._storage
self._storage = FileStorage("Source.fs", create=True)
Modified: relstorage/trunk/relstorage/tests/hptestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/hptestbase.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/hptestbase.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -193,15 +193,15 @@
return oid
def checkPackGCDisabled(self):
- self._storage._adapter.packundo.options.pack_gc = False
+ self._storage = self.make_storage(pack_gc=False)
self.checkPackGC(expect_object_deleted=False)
def checkPackGCPrePackOnly(self):
- self._storage._options.pack_prepack_only = True
+ self._storage = self.make_storage(pack_prepack_only=True)
self.checkPackGC(expect_object_deleted=False)
def checkPackGCReusePrePackData(self):
- self._storage._options.pack_prepack_only = True
+ self._storage = self.make_storage(pack_prepack_only=True)
oid = self.checkPackGC(expect_object_deleted=False)
# We now have pre-pack analysis data
self._storage._options.pack_prepack_only = False
@@ -243,13 +243,12 @@
class HistoryPreservingToFileStorage(
RelStorageTestBase,
UndoableRecoveryStorage,
- ):
+ ):
keep_history = True
def setUp(self):
- self.open(create=1)
- self._storage.zap_all()
+ self._storage = self.make_storage()
self._dst = FileStorage("Dest.fs", create=True)
def tearDown(self):
@@ -265,14 +264,12 @@
class HistoryPreservingFromFileStorage(
RelStorageTestBase,
UndoableRecoveryStorage,
- ):
+ ):
keep_history = True
def setUp(self):
- self.open(create=1)
- self._storage.zap_all()
- self._dst = self._storage
+ self._dst = self.make_storage()
self._storage = FileStorage("Source.fs", create=True)
def tearDown(self):
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -13,10 +13,8 @@
##############################################################################
"""A foundation for RelStorage tests"""
-from persistent import Persistent
-from persistent.mapping import PersistentMapping
-from relstorage.tests import fakecache
from ZODB.DB import DB
+from ZODB.POSException import ReadConflictError
from ZODB.serialize import referencesf
from ZODB.tests import BasicStorage
from ZODB.tests import ConflictResolution
@@ -26,10 +24,11 @@
from ZODB.tests import ReadOnlyStorage
from ZODB.tests import StorageTestBase
from ZODB.tests import Synchronization
-from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle
from ZODB.tests.StorageTestBase import zodb_unpickle
-from ZODB.utils import p64
+from persistent import Persistent
+from persistent.mapping import PersistentMapping
+from relstorage.tests import fakecache
import random
import time
import transaction
@@ -37,26 +36,61 @@
class RelStorageTestBase(StorageTestBase.StorageTestBase):
- def make_adapter(self):
- # abstract method
- raise NotImplementedError
+ keep_history = None # Override
+ _storage_created = None
- def open(self, **kwargs):
- from relstorage.storage import RelStorage
- adapter = self.make_adapter()
- self._storage = RelStorage(adapter, **kwargs)
- self._storage._batcher_row_limit = 1
-
def setUp(self):
- self.open(create=1)
- self._storage.zap_all()
+ pass
def tearDown(self):
transaction.abort()
- self._storage.close()
- self._storage.cleanup()
+ storage = self._storage
+ if storage is not None:
+ self._storage = None
+ storage.close()
+ storage.cleanup()
+ def get_storage(self):
+ # Create a storage with default options
+ # if it has not been created already.
+ storage = self._storage_created
+ if storage is None:
+ storage = self.make_storage()
+ self._storage_created = storage
+ return storage
+ def set_storage(self, storage):
+ self._storage_created = storage
+
+ _storage = property(get_storage, set_storage)
+
+ def make_adapter(self, options):
+ # abstract method
+ raise NotImplementedError()
+
+ def make_storage(self, zap=True, **kw):
+ from relstorage.options import Options
+ from relstorage.storage import RelStorage
+ options = Options(keep_history=self.keep_history, **kw)
+ adapter = self.make_adapter(options)
+ storage = RelStorage(adapter, options=options)
+ storage._batcher_row_limit = 1
+ if zap:
+ storage.zap_all()
+ return storage
+
+ def open(self, read_only=False):
+ # This is used by a few ZODB tests that close and reopen the storage.
+ storage = self._storage
+ if storage is not None:
+ self._storage = None
+ storage.close()
+ storage.cleanup()
+ self._storage = storage = self.make_storage(
+ read_only=read_only, zap=False)
+ return storage
+
+
class GenericRelStorageTests(
RelStorageTestBase,
BasicStorage.BasicStorage,
@@ -248,15 +282,18 @@
def checkUseCache(self):
# Store an object, cache it, then retrieve it from the cache
- self._storage._options.cache_servers = 'x:1 y:2'
- self._storage._options.cache_module_name = fakecache.__name__
- self._storage._options.cache_prefix = 'zzz'
+ self._storage = self.make_storage(
+ cache_servers='x:1 y:2',
+ cache_module_name=fakecache.__name__,
+ cache_prefix='zzz',
+ )
fakecache.data.clear()
db = DB(self._storage)
try:
c1 = db.open()
- self.assert_(c1._storage._cache.clients_global_first[0].servers,
+ self.assertEqual(
+ c1._storage._cache.clients_global_first[0].servers,
['x:1', 'y:2'])
r1 = c1.root()
# The root state and checkpoints should now be cached.
@@ -337,7 +374,9 @@
def checkPollInterval(self, shared_cache=True):
# Verify the poll_interval parameter causes RelStorage to
# delay invalidation polling.
- self._storage._options.poll_interval = 3600
+ self._storage = self.make_storage(
+ poll_interval=3600, share_local_cache=shared_cache)
+
db = DB(self._storage)
try:
tm1 = transaction.TransactionManager()
@@ -386,12 +425,12 @@
db.close()
def checkPollIntervalWithUnsharedCache(self):
- self._storage._options.share_local_cache = False
self.checkPollInterval(shared_cache=False)
def checkCachePolling(self):
- self._storage._options.poll_interval = 3600
- self._storage._options.share_local_cache = False
+ self._storage = self.make_storage(
+ poll_interval=3600, share_local_cache=False)
+
db = DB(self._storage)
try:
# Set up the database.
@@ -481,7 +520,8 @@
def checkPackBatchLockNoWait(self):
# Exercise the code in the pack algorithm that attempts to get the
# commit lock but will sleep if the lock is busy.
- self._storage._adapter.packundo.options.pack_batch_timeout = 0
+ self._storage = self.make_storage(pack_batch_timeout=0)
+
adapter = self._storage._adapter
test_conn, test_cursor = adapter.connmanager.open()
@@ -592,13 +632,14 @@
self.assertRaises(UnpicklingError, self._storage.pack,
time.time() + 10000, referencesf)
- def checkBackwardTimeTravel(self):
- # When a failover event causes the storage to switch to an
- # asynchronous slave that is not fully up to date, the poller
+ def checkBackwardTimeTravelWithoutRevertWhenStale(self):
+ # If revert_when_stale is false (the default), when the database
+ # connection is stale (such as through failover to an
+ # asynchronous slave that is not fully up to date), the poller
# should notice that backward time travel has occurred and
- # handle the situation by invalidating all objects that have
- # changed in the interval. (Currently, we simply invalidate all
- # objects when backward time travel occurs.)
+ # raise a ReadConflictError.
+ self._storage = self.make_storage(revert_when_stale=False)
+
import os
import shutil
import tempfile
@@ -630,6 +671,55 @@
finally:
shutil.rmtree(d)
+ # Sync, which will call poll_invalidations().
+ c.sync()
+
+ # Try to load an object, which should cause ReadConflictError.
+ r._p_deactivate()
+ self.assertRaises(ReadConflictError, lambda: r['beta'])
+
+ finally:
+ db.close()
+
+ def checkBackwardTimeTravelWithRevertWhenStale(self):
+ # If revert_when_stale is true, when the database
+ # connection is stale (such as through failover to an
+ # asynchronous slave that is not fully up to date), the poller
+ # should notice that backward time travel has occurred and
+ # invalidate all objects that have changed in the interval.
+ self._storage = self.make_storage(revert_when_stale=True)
+
+ import os
+ import shutil
+ import tempfile
+ from ZODB.FileStorage import FileStorage
+ db = DB(self._storage)
+ try:
+ c = db.open()
+ r = c.root()
+ r['alpha'] = PersistentMapping()
+ transaction.commit()
+
+ # To simulate failover to an out of date async slave, take
+ # a snapshot of the database at this point, change some
+ # object, then restore the database to its earlier state.
+
+ d = tempfile.mkdtemp()
+ try:
+ fs = FileStorage(os.path.join(d, 'Data.fs'))
+ fs.copyTransactionsFrom(c._storage)
+
+ r['beta'] = PersistentMapping()
+ transaction.commit()
+ self.assertTrue('beta' in r)
+
+ c._storage.zap_all()
+ c._storage.copyTransactionsFrom(fs)
+
+ fs.close()
+ finally:
+ shutil.rmtree(d)
+
# r should still be in the cache.
self.assertTrue('beta' in r)
Modified: relstorage/trunk/relstorage/tests/testmysql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testmysql.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/testmysql.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -29,18 +29,19 @@
class UseMySQLAdapter:
- def make_adapter(self):
+
+ def make_adapter(self, options):
from relstorage.adapters.mysql import MySQLAdapter
if self.keep_history:
db = base_dbname
else:
db = base_dbname + '_hf'
return MySQLAdapter(
- options=Options(keep_history=self.keep_history),
+ options=options,
db=db,
user='relstoragetest',
passwd='relstoragetest',
- )
+ )
class ZConfigTests:
Modified: relstorage/trunk/relstorage/tests/testoracle.py
===================================================================
--- relstorage/trunk/relstorage/tests/testoracle.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/testoracle.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -30,7 +30,8 @@
class UseOracleAdapter:
- def make_adapter(self):
+
+ def make_adapter(self, options):
from relstorage.adapters.oracle import OracleAdapter
dsn = os.environ.get('ORACLE_TEST_DSN', 'XE')
if self.keep_history:
@@ -41,8 +42,8 @@
user=db,
password='relstoragetest',
dsn=dsn,
- options=Options(keep_history=self.keep_history),
- )
+ options=options,
+ )
class ZConfigTests:
Modified: relstorage/trunk/relstorage/tests/testpostgresql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testpostgresql.py 2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/testpostgresql.py 2011-09-30 09:56:16 UTC (rev 123000)
@@ -29,7 +29,8 @@
class UsePostgreSQLAdapter:
- def make_adapter(self):
+
+ def make_adapter(self, options):
from relstorage.adapters.postgresql import PostgreSQLAdapter
if self.keep_history:
db = base_dbname
@@ -37,8 +38,8 @@
db = base_dbname + '_hf'
return PostgreSQLAdapter(
dsn='dbname=%s user=relstoragetest password=relstoragetest' % db,
- options=Options(keep_history=self.keep_history),
- )
+ options=options,
+ )
class ZConfigTests:
Added: relstorage/trunk/repltest/master/notes
===================================================================
--- relstorage/trunk/repltest/master/notes (rev 0)
+++ relstorage/trunk/repltest/master/notes 2011-09-30 09:56:16 UTC (rev 123000)
@@ -0,0 +1,6 @@
+CREATE DATABASE plone;
+GRANT ALL ON plone.* TO 'plone'@'%' IDENTIFIED BY 'plonepass';
+GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%' IDENTIFIED BY 'slavepass';
+FLUSH PRIVILEGES;
+RESET MASTER;
+SHOW MASTER STATUS;
Added: relstorage/trunk/repltest/slave/notes
===================================================================
--- relstorage/trunk/repltest/slave/notes (rev 0)
+++ relstorage/trunk/repltest/slave/notes 2011-09-30 09:56:16 UTC (rev 123000)
@@ -0,0 +1,11 @@
+CREATE DATABASE plone;
+GRANT ALL ON plone.* TO 'plone'@'%' IDENTIFIED BY 'plonepass';
+FLUSH PRIVILEGES;
+CHANGE MASTER TO
+ MASTER_HOST='192.168.1.61',
+ MASTER_USER='repl',
+ MASTER_PASSWORD='slavepass',
+ MASTER_LOG_FILE='mysql-bin.000001',
+ MASTER_LOG_POS=4,
+ MASTER_CONNECT_RETRY=10;
+START SLAVE;
More information about the checkins
mailing list