[Checkins] SVN: relstorage/trunk/ Better behavior for async replication. Changes:

Shane Hathaway shane at hathawaymix.org
Fri Sep 30 04:56:17 EST 2011


Log message for revision 123000:
  Better behavior for async replication.  Changes:
  
  - When the database connection is stale (such as when RelStorage switches
    to an asynchronous replica that is not yet up to date), RelStorage will
    now raise ReadConflictError by default. Ideally, the application will
    react to the error by transparently retrying the transaction, while
    the database gets up to date. A subsequent transaction will no longer
    be stale.
  
  - Added the revert-when-stale option, which can disable the new
    behavior for stale database connections.  This option is intended
    for highly available, read-only ZODB clients.  It would
    confuse users of read-write ZODB clients.
  
  This is a complex change, so it is time to start the 1.6 series.
  
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/README.txt
  U   relstorage/trunk/buildout.cfg
  U   relstorage/trunk/relstorage/adapters/interfaces.py
  U   relstorage/trunk/relstorage/adapters/mysql.py
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/poller.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  U   relstorage/trunk/relstorage/cache.py
  U   relstorage/trunk/relstorage/component.xml
  U   relstorage/trunk/relstorage/options.py
  U   relstorage/trunk/relstorage/storage.py
  U   relstorage/trunk/relstorage/tests/hftestbase.py
  U   relstorage/trunk/relstorage/tests/hptestbase.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py
  U   relstorage/trunk/relstorage/tests/testmysql.py
  U   relstorage/trunk/relstorage/tests/testoracle.py
  U   relstorage/trunk/relstorage/tests/testpostgresql.py
  A   relstorage/trunk/repltest/master/notes
  A   relstorage/trunk/repltest/slave/notes

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/CHANGES.txt	2011-09-30 09:56:16 UTC (rev 123000)
@@ -13,6 +13,18 @@
   read-only database replica for load connections. This allows
   RelStorage to use read-only database replicas whenever possible.
 
+- When the database connection is stale (such as when RelStorage switches
+  to an asynchronous replica that is not yet up to date), RelStorage will
+  now raise ReadConflictError by default. Ideally, the application will
+  react to the error by transparently retrying the transaction, while
+  the database gets up to date. A subsequent transaction will no longer
+  be stale.
+
+- Added the revert-when-stale option, which can disable the new
+  behavior for stale database connections.  This option is intended
+  for highly available, read-only ZODB clients.  It would
+  confuse users of read-write ZODB clients.
+
 TODO: provide a default random memcache prefix that is consistent per database.
 
 1.5.0 (2011-06-30)
@@ -31,8 +43,8 @@
 - Oracle, PostgreSQL: Switch to storing ZODB blob in chunks up to 4GB
   (the maximum supported by cx_Oracle) or 2GB (PostgreSQL maximum blob size)
   to maximize blob reading and writing performance.
-  
-  The PostgreSQL blob_chunk schema changed to support this, see 
+
+  The PostgreSQL blob_chunk schema changed to support this, see
   notes/migrate-to-1.5.txt to update existing databases.
 
 - zodbconvert: When copying a database containing blobs, ensure the source

Modified: relstorage/trunk/README.txt
===================================================================
--- relstorage/trunk/README.txt	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/README.txt	2011-09-30 09:56:16 UTC (rev 123000)
@@ -482,6 +482,31 @@
         try to revert to the primary replica after the specified
         timeout (in seconds).  The default is 600, meaning 10 minutes.
 
+``revert-when-stale``
+        Specifies what to do when a database connection is stale.
+        This is especially applicable to asynchronously replicated
+        databases: RelStorage could switch to a replica that is not
+        yet up to date.
+
+        When ``revert-when-stale`` is ``false`` (the default) and the
+        database connection is stale, RelStorage will raise a
+        ReadConflictError if the application tries to read or write
+        anything. The application should react to the
+        ReadConflictError by retrying the transaction after a delay
+        (possibly multiple times.) Once the database catches
+        up, a subsequent transaction will see the update and the
+        ReadConflictError will not occur again.
+
+        When ``revert-when-stale`` is ``true`` and the database connection
+        is stale, RelStorage will log a warning, clear the affected
+        ZODB connection cache (to prevent consistency errors), and let
+        the application continue with database state from
+        an earlier transaction. This behavior is intended to be useful
+        for highly available, read-only ZODB clients. Enabling this
+        option on ZODB clients that read and write the database is
+        likely to cause confusion for users whose changes
+        seem to be temporarily reverted.
+
 ``poll-interval``
         Defer polling the database for the specified maximum time interval,
         in seconds.  Set to 0 (the default) to always poll.  Fractional
@@ -522,7 +547,7 @@
         of what to pack, but no data is actually removed.  After a pre-pack,
         the pack_object, pack_state, and pack_state_tid tables are filled
         with the list of object states and objects that would have been
-        removed.  If pack-gc is true, the object_ref table will also be fully 
+        removed.  If pack-gc is true, the object_ref table will also be fully
         populated. The object_ref table can be queried to discover references
         between stored objects.
 

Modified: relstorage/trunk/buildout.cfg
===================================================================
--- relstorage/trunk/buildout.cfg	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/buildout.cfg	2011-09-30 09:56:16 UTC (rev 123000)
@@ -1,6 +1,6 @@
 [buildout]
 develop = .
-base-parts = test python coverage-test coverage-report
+base-parts = test python omelette coverage-test coverage-report
 parts = ${buildout:base-parts}
 eggs = relstorage
        psycopg2
@@ -17,6 +17,11 @@
 eggs = ${buildout:eggs}
 interpreter = py
 
+[omelette]
+recipe = collective.recipe.omelette
+eggs = ${buildout:eggs}
+ignores = setuptools
+
 [coverage-test]
 recipe = zc.recipe.testrunner
 eggs = ${buildout:eggs}
@@ -29,3 +34,4 @@
     z3c.coverage
 scripts = coveragereport
 arguments = ('coverage', 'coverage/report')
+

Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/adapters/interfaces.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -363,6 +363,10 @@
         a list of (oid, tid) that have changed, or None to indicate
         that the changes are too complex to list.  new_polled_tid is
         never None.
+
+        This method may raise ReadConflictError if the database has
+        reverted to an earlier transaction, which can happen
+        in an asynchronously replicated database.
         """
 
     def list_changes(cursor, after_tid, last_tid):
@@ -466,4 +470,3 @@
 
 class ReplicaClosedException(Exception):
     """The connection to the replica has been closed"""
-

Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/adapters/mysql.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -129,7 +129,8 @@
             poll_query=poll_query,
             keep_history=self.keep_history,
             runner=self.runner,
-            )
+            revert_when_stale=options.revert_when_stale,
+        )
 
         if self.keep_history:
             self.packundo = MySQLHistoryPreservingPackUndo(

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -124,7 +124,8 @@
             poll_query=poll_query,
             keep_history=self.keep_history,
             runner=self.runner,
-            )
+            revert_when_stale=options.revert_when_stale,
+        )
 
         if self.keep_history:
             self.packundo = OracleHistoryPreservingPackUndo(

Modified: relstorage/trunk/relstorage/adapters/poller.py
===================================================================
--- relstorage/trunk/relstorage/adapters/poller.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/adapters/poller.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -12,19 +12,23 @@
 #
 ##############################################################################
 
+from ZODB.POSException import ReadConflictError
 from relstorage.adapters.interfaces import IPoller
 from zope.interface import implements
 import logging
+
 log = logging.getLogger(__name__)
 
+
 class Poller:
     """Database change notification poller"""
     implements(IPoller)
 
-    def __init__(self, poll_query, keep_history, runner):
+    def __init__(self, poll_query, keep_history, runner, revert_when_stale):
         self.poll_query = poll_query
         self.keep_history = keep_history
         self.runner = runner
+        self.revert_when_stale = revert_when_stale
 
     def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
         """Polls for new transactions.
@@ -59,28 +63,30 @@
             # No transactions have been committed since prev_polled_tid.
             return (), new_polled_tid
 
-        if self.keep_history:
-            # If the previously polled transaction no longer exists,
-            # the cache is too old and needs to be cleared.
-            # XXX Do we actually need to detect this condition? I think
-            # if we delete this block of code, all the unreachable
-            # objects will be garbage collected anyway. So, as a test,
-            # there is no equivalent of this block of code for
-            # history-free storage. If something goes wrong, then we'll
-            # know there's some other edge condition we have to account
-            # for.
-            stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
-            cursor.execute(intern(stmt % self.runner.script_vars),
-                {'tid': prev_polled_tid})
-            rows = cursor.fetchall()
-            if not rows:
-                # Transaction not found; perhaps it has been packed.
-                # The connection cache needs to be cleared.
-                return None, new_polled_tid
+        elif new_polled_tid > prev_polled_tid:
+            # New transaction(s) have been added.
 
-        # Get the list of changed OIDs and return it.
-        if new_polled_tid > prev_polled_tid:
             if self.keep_history:
+                # If the previously polled transaction no longer exists,
+                # the cache is too old and needs to be cleared.
+                # XXX Do we actually need to detect this condition? I think
+                # if we delete this block of code, all the unreachable
+                # objects will be garbage collected anyway. So, as a test,
+                # there is no equivalent of this block of code for
+                # history-free storage. If something goes wrong, then we'll
+                # know there's some other edge condition we have to account
+                # for.
+                stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
+                cursor.execute(intern(stmt % self.runner.script_vars),
+                    {'tid': prev_polled_tid})
+                rows = cursor.fetchall()
+                if not rows:
+                    # Transaction not found; perhaps it has been packed.
+                    # The connection cache should be cleared.
+                    return None, new_polled_tid
+
+            # Get the list of changed OIDs and return it.
+            if self.keep_history:
                 stmt = """
                 SELECT zoid, tid
                 FROM current_object
@@ -104,21 +110,28 @@
             return changes, new_polled_tid
 
         else:
-            # We moved backward in time. This can happen after failover
-            # to an asynchronous slave that is not fully up to date. If
-            # this was not caused by failover, this condition suggests that
-            # transaction IDs are not being created in order, which can
-            # lead to consistency violations.
-            log.warning(
-                "Detected backward time travel (old tid %d, new tid %d). "
-                "This is acceptable if it was caused by failover to a "
-                "read-only asynchronous slave, but otherwise it may "
-                "indicate a problem.",
-                prev_polled_tid, new_polled_tid)
-            # Although we could handle this situation by looking at the
-            # whole cPickleCache and invalidating only certain objects,
-            # invalidating the whole cache is simpler.
-            return None, new_polled_tid
+            # The database connection is stale. This can happen after
+            # reading an asynchronous slave that is not fully up to date.
+            # (It may also suggest that transaction IDs are not being created
+            # in order, which would be a serious bug leading to consistency
+            # violations.)
+            if self.revert_when_stale:
+                # This client prefers to revert to the old state.
+                log.warning(
+                    "Reverting to stale transaction ID %d and clearing cache. "
+                    "(prev_polled_tid=%d)",
+                    new_polled_tid, prev_polled_tid)
+                # We have to invalidate the whole cPickleCache, otherwise
+                # the cache would be inconsistent with the reverted state.
+                return None, new_polled_tid
+            else:
+                # This client never wants to revert to stale data, so
+                # raise ReadConflictError to trigger a retry.
+                # We're probably just waiting for async replication
+                # to catch up, so retrying could do the trick.
+                raise ReadConflictError(
+                    "The database connection is stale: new_polled_tid=%d, "
+                    "prev_polled_tid=%d." % (new_polled_tid, prev_polled_tid))
 
     def list_changes(self, cursor, after_tid, last_tid):
         """Return the (oid, tid) values changed in a range of transactions.

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -94,7 +94,8 @@
             poll_query="EXECUTE get_latest_tid",
             keep_history=self.keep_history,
             runner=self.runner,
-            )
+            revert_when_stale=options.revert_when_stale,
+        )
 
         if self.keep_history:
             self.packundo = HistoryPreservingPackUndo(

Modified: relstorage/trunk/relstorage/cache.py
===================================================================
--- relstorage/trunk/relstorage/cache.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/cache.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -467,13 +467,17 @@
                 new_checkpoints = (new_tid_int, new_tid_int)
             allow_shift = False
 
+        # We want to keep the current checkpoints for speed, but we
+        # have to replace them (to avoid consistency violations)
+        # if certain conditions happen (like emptying the ZODB cache).
         if (new_checkpoints == self.checkpoints
                 and changes is not None
                 and prev_tid_int
                 and prev_tid_int <= self.current_tid
                 and new_tid_int >= self.current_tid
                 ):
-            # Keep the checkpoints and update self.delta_after0.
+            # All the conditions for keeping the checkpoints were met,
+            # so just update self.delta_after0 and self.current_tid.
             m = self.delta_after0
             m_get = m.get
             for oid_int, tid_int in changes:
@@ -482,6 +486,7 @@
                     m[oid_int] = tid_int
             self.current_tid = new_tid_int
         else:
+            # We have to replace the checkpoints.
             cp0, cp1 = new_checkpoints
             log.debug("Using new checkpoints: %d %d", cp0, cp1)
             # Use the checkpoints specified by the cache.
@@ -491,20 +496,20 @@
             if cp1 < new_tid_int:
                 # poller.list_changes provides an iterator of
                 # (oid, tid) where tid > after_tid and tid <= last_tid.
-                changes = self.adapter.poller.list_changes(
+                change_list = self.adapter.poller.list_changes(
                     cursor, cp1, new_tid_int)
 
                 # Make a dictionary that contains, for each oid, the most
                 # recent tid listed in changes.
-                changes_dict = {}
-                if not isinstance(changes, list):
-                    changes = list(changes)
-                changes.sort()
-                for oid_int, tid_int in changes:
-                    changes_dict[oid_int] = tid_int
+                change_dict = {}
+                if not isinstance(change_list, list):
+                    change_list = list(change_list)
+                change_list.sort()
+                for oid_int, tid_int in change_list:
+                    change_dict[oid_int] = tid_int
 
                 # Put the changes in new_delta_after*.
-                for oid_int, tid_int in changes_dict.iteritems():
+                for oid_int, tid_int in change_dict.iteritems():
                     if tid_int > cp0:
                         new_delta_after0[oid_int] = tid_int
                     elif tid_int > cp1:

Modified: relstorage/trunk/relstorage/component.xml
===================================================================
--- relstorage/trunk/relstorage/component.xml	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/component.xml	2011-09-30 09:56:16 UTC (rev 123000)
@@ -41,6 +41,9 @@
     <key name="replica-timeout" datatype="float" default="600.0">
       <description>See the RelStorage README.txt file.</description>
     </key>
+    <key name="revert_when_stale" datatype="boolean" default="false">
+      <description>See the RelStorage README.txt file.</description>
+    </key>
     <key name="poll-interval" datatype="float" required="no">
       <description>See the RelStorage README.txt file.</description>
     </key>

Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/options.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -47,6 +47,7 @@
         self.replica_conf = None
         self.ro_replica_conf = None
         self.replica_timeout = 600.0
+        self.revert_when_stale = False
         self.poll_interval = 0
         self.pack_gc = True
         self.pack_prepack_only = False

Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/storage.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -16,21 +16,22 @@
 Stores pickles in the database.
 """
 
-from persistent.TimeStamp import TimeStamp
-from relstorage.blobhelper import BlobHelper
-from relstorage.blobhelper import is_blob_record
-from relstorage.cache import StorageCache
-from relstorage.options import Options
+from ZODB import ConflictResolution
+from ZODB import POSException
 from ZODB.BaseStorage import DataRecord
 from ZODB.BaseStorage import TransactionRecord
-from ZODB import ConflictResolution
-from ZODB import POSException
 from ZODB.FileStorage import FileIterator
 from ZODB.POSException import POSKeyError
 from ZODB.UndoLogCompatible import UndoLogCompatible
 from ZODB.utils import p64
 from ZODB.utils import u64
+from persistent.TimeStamp import TimeStamp
+from relstorage.blobhelper import BlobHelper
+from relstorage.blobhelper import is_blob_record
+from relstorage.cache import StorageCache
+from relstorage.options import Options
 from zope.interface import implements
+import ZODB.interfaces
 import base64
 import cPickle
 import logging
@@ -39,7 +40,6 @@
 import threading
 import time
 import weakref
-import ZODB.interfaces
 
 try:
     from ZODB.interfaces import StorageStopIteration
@@ -143,6 +143,10 @@
     # calling the database.
     _batcher_row_limit = 100
 
+    # _stale_error is None most of the time.  It's a ReadConflictError
+    # when the database connection is stale (due to async replication).
+    _stale_error = None
+
     def __init__(self, adapter, name=None, create=None,
             options=None, cache=None, blobhelper=None, **kwoptions):
         self._adapter = adapter
@@ -328,7 +332,7 @@
         self._cache.clear()
 
     def release(self):
-        """Release back end database sessions used by this storage instance.
+        """Release database sessions used by this storage instance.
         """
         self._lock_acquire()
         try:
@@ -446,6 +450,9 @@
         logfunc('; '.join(msg))
 
     def load(self, oid, version=''):
+        if self._stale_error is not None:
+            raise self._stale_error
+
         oid_int = u64(oid)
         cache = self._cache
 
@@ -471,7 +478,10 @@
             raise POSKeyError(oid)
 
     def getTid(self, oid):
-        state, serial = self.load(oid, '')
+        if self._stale_error is not None:
+            raise self._stale_error
+
+        _state, serial = self.load(oid, '')
         return serial
 
     getSerial = getTid  # ZODB 3.7
@@ -510,6 +520,9 @@
 
     def loadBefore(self, oid, tid):
         """Return the most recent revision of oid before tid committed."""
+        if self._stale_error is not None:
+            raise self._stale_error
+
         oid_int = u64(oid)
 
         self._lock_acquire()
@@ -543,6 +556,8 @@
             self._lock_release()
 
     def store(self, oid, serial, data, version, transaction):
+        if self._stale_error is not None:
+            raise self._stale_error
         if self._is_read_only:
             raise POSException.ReadOnlyError()
         if transaction is not self._transaction:
@@ -580,6 +595,8 @@
         # Like store(), but used for importing transactions.  See the
         # comments in FileStorage.restore().  The prev_txn optimization
         # is not used.
+        if self._stale_error is not None:
+            raise self._stale_error
         if self._is_read_only:
             raise POSException.ReadOnlyError()
         if transaction is not self._transaction:
@@ -606,6 +623,8 @@
             self._lock_release()
 
     def checkCurrentSerialInTransaction(self, oid, serial, transaction):
+        if self._stale_error is not None:
+            raise self._stale_error
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
 
@@ -626,6 +645,8 @@
         self._txn_check_serials[oid] = serial
 
     def tpc_begin(self, transaction, tid=None, status=' '):
+        if self._stale_error is not None:
+            raise self._stale_error
         if self._is_read_only:
             raise POSException.ReadOnlyError()
         self._lock_acquire()
@@ -916,6 +937,8 @@
             self._lock_release()
 
     def new_oid(self):
+        if self._stale_error is not None:
+            raise self._stale_error
         if self._is_read_only:
             raise POSException.ReadOnlyError()
         self._lock_acquire()
@@ -950,6 +973,8 @@
         return self._adapter.keep_history
 
     def undoLog(self, first=0, last=-20, filter=None):
+        if self._stale_error is not None:
+            raise self._stale_error
         if last < 0:
             last = first - last
 
@@ -980,6 +1005,8 @@
             adapter.connmanager.close(conn, cursor)
 
     def history(self, oid, version=None, size=1, filter=None):
+        if self._stale_error is not None:
+            raise self._stale_error
         self._lock_acquire()
         try:
             cursor = self._load_cursor
@@ -1020,6 +1047,8 @@
         the transaction.
         """
 
+        if self._stale_error is not None:
+            raise self._stale_error
         if self._is_read_only:
             raise POSException.ReadOnlyError()
         if transaction is not self._transaction:
@@ -1199,9 +1228,19 @@
         prev = self._prev_polled_tid
 
         # get a list of changed OIDs and the most recent tid
-        changes, new_polled_tid = self._restart_load_and_call(
-            self._adapter.poller.poll_invalidations, prev, ignore_tid)
+        try:
+            changes, new_polled_tid = self._restart_load_and_call(
+                self._adapter.poller.poll_invalidations, prev, ignore_tid)
+        except POSException.ReadConflictError, e:
+            # The database connection is stale, but postpone this
+            # error until the application tries to read or write something.
+            self._stale_error = e
+            # Always poll (override the poll_interval option).
+            self._poll_at = 0
+            return None, prev
 
+        self._stale_error = None
+
         # Inform the cache of the changes.
         self._cache.after_poll(
             self._load_cursor, prev, new_polled_tid, changes)
@@ -1329,7 +1368,7 @@
         elif isinstance(other, FileIterator):
             # Create copy and ask for that for it's length so we do not
             # exhaust the original iterator
-            copy = FileIterator(other._file_name, other._start, other._stop, 
+            copy = FileIterator(other._file_name, other._start, other._stop,
                 other._pos)
             num_txns = len(list(copy))
         else:

Modified: relstorage/trunk/relstorage/tests/hftestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/hftestbase.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/hftestbase.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -257,8 +257,7 @@
     keep_history = False
 
     def setUp(self):
-        self.open(create=1)
-        self._storage.zap_all()
+        self._storage = self.make_storage()
         self._dst = FileStorage("Dest.fs", create=True)
 
     def tearDown(self):
@@ -279,8 +278,6 @@
     keep_history = False
 
     def setUp(self):
-        self.open(create=1)
-        self._storage.zap_all()
         self._dst = self._storage
         self._storage = FileStorage("Source.fs", create=True)
 

Modified: relstorage/trunk/relstorage/tests/hptestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/hptestbase.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/hptestbase.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -193,15 +193,15 @@
         return oid
 
     def checkPackGCDisabled(self):
-        self._storage._adapter.packundo.options.pack_gc = False
+        self._storage = self.make_storage(pack_gc=False)
         self.checkPackGC(expect_object_deleted=False)
 
     def checkPackGCPrePackOnly(self):
-        self._storage._options.pack_prepack_only = True
+        self._storage = self.make_storage(pack_prepack_only=True)
         self.checkPackGC(expect_object_deleted=False)
 
     def checkPackGCReusePrePackData(self):
-        self._storage._options.pack_prepack_only = True
+        self._storage = self.make_storage(pack_prepack_only=True)
         oid = self.checkPackGC(expect_object_deleted=False)
         # We now have pre-pack analysis data
         self._storage._options.pack_prepack_only = False
@@ -243,13 +243,12 @@
 class HistoryPreservingToFileStorage(
         RelStorageTestBase,
         UndoableRecoveryStorage,
-        ):
+    ):
 
     keep_history = True
 
     def setUp(self):
-        self.open(create=1)
-        self._storage.zap_all()
+        self._storage = self.make_storage()
         self._dst = FileStorage("Dest.fs", create=True)
 
     def tearDown(self):
@@ -265,14 +264,12 @@
 class HistoryPreservingFromFileStorage(
         RelStorageTestBase,
         UndoableRecoveryStorage,
-        ):
+    ):
 
     keep_history = True
 
     def setUp(self):
-        self.open(create=1)
-        self._storage.zap_all()
-        self._dst = self._storage
+        self._dst = self.make_storage()
         self._storage = FileStorage("Source.fs", create=True)
 
     def tearDown(self):

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -13,10 +13,8 @@
 ##############################################################################
 """A foundation for RelStorage tests"""
 
-from persistent import Persistent
-from persistent.mapping import PersistentMapping
-from relstorage.tests import fakecache
 from ZODB.DB import DB
+from ZODB.POSException import ReadConflictError
 from ZODB.serialize import referencesf
 from ZODB.tests import BasicStorage
 from ZODB.tests import ConflictResolution
@@ -26,10 +24,11 @@
 from ZODB.tests import ReadOnlyStorage
 from ZODB.tests import StorageTestBase
 from ZODB.tests import Synchronization
-from ZODB.tests.MinPO import MinPO
 from ZODB.tests.StorageTestBase import zodb_pickle
 from ZODB.tests.StorageTestBase import zodb_unpickle
-from ZODB.utils import p64
+from persistent import Persistent
+from persistent.mapping import PersistentMapping
+from relstorage.tests import fakecache
 import random
 import time
 import transaction
@@ -37,26 +36,61 @@
 
 class RelStorageTestBase(StorageTestBase.StorageTestBase):
 
-    def make_adapter(self):
-        # abstract method
-        raise NotImplementedError
+    keep_history = None  # Override
+    _storage_created = None
 
-    def open(self, **kwargs):
-        from relstorage.storage import RelStorage
-        adapter = self.make_adapter()
-        self._storage = RelStorage(adapter, **kwargs)
-        self._storage._batcher_row_limit = 1
-
     def setUp(self):
-        self.open(create=1)
-        self._storage.zap_all()
+        pass
 
     def tearDown(self):
         transaction.abort()
-        self._storage.close()
-        self._storage.cleanup()
+        storage = self._storage
+        if storage is not None:
+            self._storage = None
+            storage.close()
+            storage.cleanup()
 
+    def get_storage(self):
+        # Create a storage with default options
+        # if it has not been created already.
+        storage = self._storage_created
+        if storage is None:
+            storage = self.make_storage()
+            self._storage_created = storage
+        return storage
 
+    def set_storage(self, storage):
+        self._storage_created = storage
+
+    _storage = property(get_storage, set_storage)
+
+    def make_adapter(self, options):
+        # abstract method
+        raise NotImplementedError()
+
+    def make_storage(self, zap=True, **kw):
+        from relstorage.options import Options
+        from relstorage.storage import RelStorage
+        options = Options(keep_history=self.keep_history, **kw)
+        adapter = self.make_adapter(options)
+        storage = RelStorage(adapter, options=options)
+        storage._batcher_row_limit = 1
+        if zap:
+            storage.zap_all()
+        return storage
+
+    def open(self, read_only=False):
+        # This is used by a few ZODB tests that close and reopen the storage.
+        storage = self._storage
+        if storage is not None:
+            self._storage = None
+            storage.close()
+            storage.cleanup()
+        self._storage = storage = self.make_storage(
+            read_only=read_only, zap=False)
+        return storage
+
+
 class GenericRelStorageTests(
     RelStorageTestBase,
     BasicStorage.BasicStorage,
@@ -248,15 +282,18 @@
 
     def checkUseCache(self):
         # Store an object, cache it, then retrieve it from the cache
-        self._storage._options.cache_servers = 'x:1 y:2'
-        self._storage._options.cache_module_name = fakecache.__name__
-        self._storage._options.cache_prefix = 'zzz'
+        self._storage = self.make_storage(
+            cache_servers='x:1 y:2',
+            cache_module_name=fakecache.__name__,
+            cache_prefix='zzz',
+        )
 
         fakecache.data.clear()
         db = DB(self._storage)
         try:
             c1 = db.open()
-            self.assert_(c1._storage._cache.clients_global_first[0].servers,
+            self.assertEqual(
+                c1._storage._cache.clients_global_first[0].servers,
                 ['x:1', 'y:2'])
             r1 = c1.root()
             # The root state and checkpoints should now be cached.
@@ -337,7 +374,9 @@
     def checkPollInterval(self, shared_cache=True):
         # Verify the poll_interval parameter causes RelStorage to
         # delay invalidation polling.
-        self._storage._options.poll_interval = 3600
+        self._storage = self.make_storage(
+            poll_interval=3600, share_local_cache=shared_cache)
+
         db = DB(self._storage)
         try:
             tm1 = transaction.TransactionManager()
@@ -386,12 +425,12 @@
             db.close()
 
     def checkPollIntervalWithUnsharedCache(self):
-        self._storage._options.share_local_cache = False
         self.checkPollInterval(shared_cache=False)
 
     def checkCachePolling(self):
-        self._storage._options.poll_interval = 3600
-        self._storage._options.share_local_cache = False
+        self._storage = self.make_storage(
+            poll_interval=3600, share_local_cache=False)
+
         db = DB(self._storage)
         try:
             # Set up the database.
@@ -481,7 +520,8 @@
     def checkPackBatchLockNoWait(self):
         # Exercise the code in the pack algorithm that attempts to get the
         # commit lock but will sleep if the lock is busy.
-        self._storage._adapter.packundo.options.pack_batch_timeout = 0
+        self._storage = self.make_storage(pack_batch_timeout=0)
+
         adapter = self._storage._adapter
         test_conn, test_cursor = adapter.connmanager.open()
 
@@ -592,13 +632,14 @@
         self.assertRaises(UnpicklingError, self._storage.pack,
             time.time() + 10000, referencesf)
 
-    def checkBackwardTimeTravel(self):
-        # When a failover event causes the storage to switch to an
-        # asynchronous slave that is not fully up to date, the poller
+    def checkBackwardTimeTravelWithoutRevertWhenStale(self):
+        # If revert_when_stale is false (the default), when the database
+        # connection is stale (such as through failover to an
+        # asynchronous slave that is not fully up to date), the poller
         # should notice that backward time travel has occurred and
-        # handle the situation by invalidating all objects that have
-        # changed in the interval. (Currently, we simply invalidate all
-        # objects when backward time travel occurs.)
+        # raise a ReadConflictError.
+        self._storage = self.make_storage(revert_when_stale=False)
+
         import os
         import shutil
         import tempfile
@@ -630,6 +671,55 @@
             finally:
                 shutil.rmtree(d)
 
+            # Sync, which will call poll_invalidations().
+            c.sync()
+
+            # Try to load an object, which should cause ReadConflictError.
+            r._p_deactivate()
+            self.assertRaises(ReadConflictError, lambda: r['beta'])
+
+        finally:
+            db.close()
+
+    def checkBackwardTimeTravelWithRevertWhenStale(self):
+        # If revert_when_stale is true, when the database
+        # connection is stale (such as through failover to an
+        # asynchronous slave that is not fully up to date), the poller
+        # should notice that backward time travel has occurred and
+        # invalidate all objects that have changed in the interval.
+        self._storage = self.make_storage(revert_when_stale=True)
+
+        import os
+        import shutil
+        import tempfile
+        from ZODB.FileStorage import FileStorage
+        db = DB(self._storage)
+        try:
+            c = db.open()
+            r = c.root()
+            r['alpha'] = PersistentMapping()
+            transaction.commit()
+
+            # To simulate failover to an out of date async slave, take
+            # a snapshot of the database at this point, change some
+            # object, then restore the database to its earlier state.
+
+            d = tempfile.mkdtemp()
+            try:
+                fs = FileStorage(os.path.join(d, 'Data.fs'))
+                fs.copyTransactionsFrom(c._storage)
+
+                r['beta'] = PersistentMapping()
+                transaction.commit()
+                self.assertTrue('beta' in r)
+
+                c._storage.zap_all()
+                c._storage.copyTransactionsFrom(fs)
+
+                fs.close()
+            finally:
+                shutil.rmtree(d)
+
             # r should still be in the cache.
             self.assertTrue('beta' in r)
 

Modified: relstorage/trunk/relstorage/tests/testmysql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testmysql.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/testmysql.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -29,18 +29,19 @@
 
 
 class UseMySQLAdapter:
-    def make_adapter(self):
+
+    def make_adapter(self, options):
         from relstorage.adapters.mysql import MySQLAdapter
         if self.keep_history:
             db = base_dbname
         else:
             db = base_dbname + '_hf'
         return MySQLAdapter(
-            options=Options(keep_history=self.keep_history),
+            options=options,
             db=db,
             user='relstoragetest',
             passwd='relstoragetest',
-            )
+        )
 
 
 class ZConfigTests:

Modified: relstorage/trunk/relstorage/tests/testoracle.py
===================================================================
--- relstorage/trunk/relstorage/tests/testoracle.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/testoracle.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -30,7 +30,8 @@
 
 
 class UseOracleAdapter:
-    def make_adapter(self):
+
+    def make_adapter(self, options):
         from relstorage.adapters.oracle import OracleAdapter
         dsn = os.environ.get('ORACLE_TEST_DSN', 'XE')
         if self.keep_history:
@@ -41,8 +42,8 @@
             user=db,
             password='relstoragetest',
             dsn=dsn,
-            options=Options(keep_history=self.keep_history),
-            )
+            options=options,
+        )
 
 
 class ZConfigTests:

Modified: relstorage/trunk/relstorage/tests/testpostgresql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testpostgresql.py	2011-09-30 00:58:26 UTC (rev 122999)
+++ relstorage/trunk/relstorage/tests/testpostgresql.py	2011-09-30 09:56:16 UTC (rev 123000)
@@ -29,7 +29,8 @@
 
 
 class UsePostgreSQLAdapter:
-    def make_adapter(self):
+
+    def make_adapter(self, options):
         from relstorage.adapters.postgresql import PostgreSQLAdapter
         if self.keep_history:
             db = base_dbname
@@ -37,8 +38,8 @@
             db = base_dbname + '_hf'
         return PostgreSQLAdapter(
             dsn='dbname=%s user=relstoragetest password=relstoragetest' % db,
-            options=Options(keep_history=self.keep_history),
-            )
+            options=options,
+        )
 
 
 class ZConfigTests:

Added: relstorage/trunk/repltest/master/notes
===================================================================
--- relstorage/trunk/repltest/master/notes	                        (rev 0)
+++ relstorage/trunk/repltest/master/notes	2011-09-30 09:56:16 UTC (rev 123000)
@@ -0,0 +1,6 @@
+CREATE DATABASE plone;
+GRANT ALL ON plone.* TO 'plone'@'%' IDENTIFIED BY 'plonepass';
+GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%' IDENTIFIED BY 'slavepass';
+FLUSH PRIVILEGES;
+RESET MASTER;
+SHOW MASTER STATUS;

Added: relstorage/trunk/repltest/slave/notes
===================================================================
--- relstorage/trunk/repltest/slave/notes	                        (rev 0)
+++ relstorage/trunk/repltest/slave/notes	2011-09-30 09:56:16 UTC (rev 123000)
@@ -0,0 +1,11 @@
+CREATE DATABASE plone;
+GRANT ALL ON plone.* TO 'plone'@'%' IDENTIFIED BY 'plonepass';
+FLUSH PRIVILEGES;
+CHANGE MASTER TO
+    MASTER_HOST='192.168.1.61',
+    MASTER_USER='repl',
+    MASTER_PASSWORD='slavepass',
+    MASTER_LOG_FILE='mysql-bin.000001',
+    MASTER_LOG_POS=4,
+    MASTER_CONNECT_RETRY=10;
+START SLAVE;



More information about the checkins mailing list