[Zodb-checkins] SVN: ZODB/branches/3.7/src/Z Fixed the "potential ZODB cache inconsistency after client reconnect"

Jim Fulton jim at zope.com
Tue Aug 15 19:35:22 EDT 2006


Log message for revision 69549:
  Fixed the "potential ZODB cache inconsistency after client reconnect"
  problem reported on zodb-dev:
  
  http://mail.zope.org/pipermail/zodb-dev/2006-August/010343.html
  
  Added a new invalidateCache protocol for DBs and Connections to
  invalidate the entire in-memory caches.  This is used when ZEO clients
  reconnect.
  

Changed:
  U   ZODB/branches/3.7/src/ZEO/ClientStorage.py
  U   ZODB/branches/3.7/src/ZEO/tests/testZEO.py
  U   ZODB/branches/3.7/src/ZODB/Connection.py
  U   ZODB/branches/3.7/src/ZODB/DB.py
  U   ZODB/branches/3.7/src/ZODB/interfaces.py
  U   ZODB/branches/3.7/src/ZODB/tests/testConnection.py
  U   ZODB/branches/3.7/src/ZODB/tests/testDB.py

-=-
Modified: ZODB/branches/3.7/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/3.7/src/ZEO/ClientStorage.py	2006-08-15 21:36:17 UTC (rev 69548)
+++ ZODB/branches/3.7/src/ZEO/ClientStorage.py	2006-08-15 23:35:21 UTC (rev 69549)
@@ -460,6 +460,10 @@
             # this method before it was stopped.
             return
 
+        # invalidate our db cache
+        if self._db is not None:
+            self._db.invalidateCache()
+
         # TODO:  report whether we get a read-only connection.
         if self._connection is not None:
             reconnect = 1

Modified: ZODB/branches/3.7/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/3.7/src/ZEO/tests/testZEO.py	2006-08-15 21:36:17 UTC (rev 69548)
+++ ZODB/branches/3.7/src/ZEO/tests/testZEO.py	2006-08-15 23:35:21 UTC (rev 69549)
@@ -306,6 +306,53 @@
         self.assert_('exc_info' in log[0][1])
         self.assertEqual(log[1][0], "Couldn't close a dispatcher.")
         self.assert_('exc_info' in log[1][1])
+
+class ConnectionInvalidationOnReconnect(
+    ZEO.tests.ConnectionTests.CommonSetupTearDown):
+    """Test what happens when the client loop falls over
+    """
+
+    def getConfig(self, path, create, read_only):
+        return """<mappingstorage 1/>"""
+
+    def checkConnectionInvalidationOnReconnect(self):
+
+        storage = ClientStorage(self.addr, wait=1, min_disconnect_poll=0.1)
+        self._storage = storage
+
+        # and we'll wait for the storage to be reconnected:
+        for i in range(100):
+            if storage.is_connected():
+                break
+            time.sleep(0.1)
+        else:
+            raise AssertionError("Couldn't connect to server")
+
+        class DummyDB:
+            _invalidatedCache = 0
+            def invalidateCache(self):
+                self._invalidatedCache += 1
+            def invalidate(*a, **k):
+                pass
+                
+        db = DummyDB()
+        storage.registerDB(db, None)
+
+        base = db._invalidatedCache
+
+        # Now we'll force a disconnection and reconnection
+        storage._connection.close()
+
+        # and we'll wait for the storage to be reconnected:
+        for i in range(100):
+            if storage.is_connected():
+                break
+            time.sleep(0.1)
+        else:
+            raise AssertionError("Couldn't connect to server")
+
+        # Now, the root object in the connection should have been invalidated:
+        self.assertEqual(db._invalidatedCache, base+1)
     
 
 class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase):
@@ -345,6 +392,7 @@
                 DemoStorageWrappedAroundClientStorage,
                 HeartbeatTests,
                 CatastrophicClientLoopFailure,
+                ConnectionInvalidationOnReconnect,
                ]
 
 def test_suite():

Modified: ZODB/branches/3.7/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/3.7/src/ZODB/Connection.py	2006-08-15 21:36:17 UTC (rev 69548)
+++ ZODB/branches/3.7/src/ZODB/Connection.py	2006-08-15 23:35:21 UTC (rev 69549)
@@ -130,8 +130,11 @@
         # critical sections (if any -- this needs careful thought).
 
         self._inv_lock = threading.Lock()
-        self._invalidated = d = {}
+        self._invalidated = {}
 
+        # Flag indicating whether the cache has been invalidated:
+        self._invalidatedCache = False
+
         # We intend to prevent committing a transaction in which
         # ReadConflictError occurs.  _conflicts is the set of oids that
         # experienced ReadConflictError.  Any time we raise ReadConflictError,
@@ -288,6 +291,14 @@
         finally:
             self._inv_lock.release()
 
+    def invalidateCache(self):
+        self._inv_lock.acquire()
+        try:
+            self._invalidatedCache = True
+        finally:
+            self._inv_lock.release()
+        
+
     def root(self):
         """Return the database root object."""
         return self.get(z64)
@@ -450,6 +461,9 @@
             invalidated = self._invalidated
             self._invalidated = {}
             self._txn_time = None
+            if self._invalidatedCache:
+                self._invalidatedCache = False
+                invalidated = self._cache.cache_data.copy()
         finally:
             self._inv_lock.release()
 
@@ -501,6 +515,9 @@
 
         self._added_during_commit = []
 
+        if self._invalidatedCache:
+            raise ConflictError()            
+
         for obj in self._registered_objects:
             oid = obj._p_oid
             assert oid
@@ -759,6 +776,10 @@
         # dict update could go on in another thread, but we don't care
         # because we have to check again after the load anyway.
 
+
+        if self._invalidatedCache:
+            raise ReadConflictError()            
+
         if (obj._p_oid in self._invalidated and
                 not myhasattr(obj, "_p_independent")):
             # If the object has _p_independent(), we will handle it below.
@@ -947,6 +968,7 @@
         """
         self._reset_counter = global_reset_counter
         self._invalidated.clear()
+        self._invalidatedCache = False
         cache_size = self._cache.cache_size
         self._cache = cache = PickleCache(self, cache_size)
 

Modified: ZODB/branches/3.7/src/ZODB/DB.py
===================================================================
--- ZODB/branches/3.7/src/ZODB/DB.py	2006-08-15 21:36:17 UTC (rev 69548)
+++ ZODB/branches/3.7/src/ZODB/DB.py	2006-08-15 23:35:21 UTC (rev 69549)
@@ -480,6 +480,12 @@
                 c.invalidate(tid, oids)
         self._connectionMap(inval)
 
+    def invalidateCache(self):
+        """Invalidate each of the connection caches
+        """        
+        self._miv_cache.clear()
+        self._connectionMap(lambda c: c.invalidateCache())
+
     def modifiedInVersion(self, oid):
         h = hash(oid) % 131
         cache = self._miv_cache

Modified: ZODB/branches/3.7/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/3.7/src/ZODB/interfaces.py	2006-08-15 21:36:17 UTC (rev 69548)
+++ ZODB/branches/3.7/src/ZODB/interfaces.py	2006-08-15 23:35:21 UTC (rev 69549)
@@ -283,40 +283,51 @@
         If clear is True, reset the counters.
         """
 
+    def invalidateCache():
+        """Invalidate the connection cache
+
+        This invalidates *all* objects in the cache. If the connection
+        is open, subsequent reads will fail until a new transaction
+        begins or until the connection os reopned.
+        
+        """
+
 class IDatabase(Interface):
     """ZODB DB.
 
     TODO: This interface is incomplete.
     """
 
-    def __init__(storage,
-                 pool_size=7,
-                 cache_size=400,
-                 version_pool_size=3,
-                 version_cache_size=100,
-                 database_name='unnamed',
-                 databases=None,
-                 ):
-        """Create an object database.
+## __init__ methods don't belong in interfaces:
+##
+##     def __init__(storage,
+##                  pool_size=7,
+##                  cache_size=400,
+##                  version_pool_size=3,
+##                  version_cache_size=100,
+##                  database_name='unnamed',
+##                  databases=None,
+##                  ):
+##         """Create an object database.
 
-        storage: the storage used by the database, e.g. FileStorage
-        pool_size: expected maximum number of open connections
-        cache_size: target size of Connection object cache, in number of
-            objects
-        version_pool_size: expected maximum number of connections (per
-            version)
-        version_cache_size: target size of Connection object cache for
-             version connections, in number of objects
-        database_name: when using a multi-database, the name of this DB
-            within the database group.  It's a (detected) error if databases
-            is specified too and database_name is already a key in it.
-            This becomes the value of the DB's database_name attribute.
-        databases: when using a multi-database, a mapping to use as the
-            binding of this DB's .databases attribute.  It's intended
-            that the second and following DB's added to a multi-database
-            pass the .databases attribute set on the first DB added to the
-            collection.
-        """
+##         storage: the storage used by the database, e.g. FileStorage
+##         pool_size: expected maximum number of open connections
+##         cache_size: target size of Connection object cache, in number of
+##             objects
+##         version_pool_size: expected maximum number of connections (per
+##             version)
+##         version_cache_size: target size of Connection object cache for
+##              version connections, in number of objects
+##         database_name: when using a multi-database, the name of this DB
+##             within the database group.  It's a (detected) error if databases
+##             is specified too and database_name is already a key in it.
+##             This becomes the value of the DB's database_name attribute.
+##         databases: when using a multi-database, a mapping to use as the
+##             binding of this DB's .databases attribute.  It's intended
+##             that the second and following DB's added to a multi-database
+##             pass the .databases attribute set on the first DB added to the
+##             collection.
+##         """
 
     databases = Attribute("""\
         A mapping from database name to DB (database) object.
@@ -328,6 +339,12 @@
         entry.
         """)
 
+    def invalidateCache():
+        """Invalidate all objects in the database object caches
+
+        invalidateCache will be called on each of the database's connections.
+        """
+
 class IStorage(Interface):
     """A storage is responsible for storing and retrieving data of objects.
     """

Modified: ZODB/branches/3.7/src/ZODB/tests/testConnection.py
===================================================================
--- ZODB/branches/3.7/src/ZODB/tests/testConnection.py	2006-08-15 21:36:17 UTC (rev 69548)
+++ ZODB/branches/3.7/src/ZODB/tests/testConnection.py	2006-08-15 23:35:21 UTC (rev 69549)
@@ -478,6 +478,80 @@
 
         """
 
+def test_invalidateCache():
+    """\
+
+The invalidateCache method invalidates a connection's cache.  It also
+prevents reads until the end of a transaction.
+
+    >>> from ZODB.tests.util import DB
+    >>> import transaction
+    >>> db = DB()
+    >>> tm = transaction.TransactionManager()
+    >>> connection = db.open(transaction_manager=tm)
+    >>> connection.root()['a'] = StubObject()
+    >>> connection.root()['a'].x = 1
+    >>> connection.root()['b'] = StubObject()
+    >>> connection.root()['b'].x = 1
+    >>> connection.root()['c'] = StubObject()
+    >>> connection.root()['c'].x = 1
+    >>> tm.commit()
+    >>> connection.root()['b']._p_deactivate()
+    >>> connection.root()['c'].x = 2
+
+So we have a connection and an active transaction with some
+modifications. Lets call invalidateCache:
+
+    >>> connection.invalidateCache()
+
+Now, if we try to load an object, we'll get a read conflict:
+
+    >>> connection.root()['b'].x
+    Traceback (most recent call last):
+    ...
+    ReadConflictError: database read conflict error
+
+If we try to commit the transaction, we'll get a conflict error:
+
+    >>> tm.commit()
+    Traceback (most recent call last):
+    ...
+    ConflictError: database conflict error
+
+and the cache will have been cleared:
+
+    >>> print connection.root()['a']._p_changed
+    None
+    >>> print connection.root()['b']._p_changed
+    None
+    >>> print connection.root()['c']._p_changed
+    None
+
+But we'll be able to access data again:
+
+    >>> connection.root()['b'].x
+    1
+
+Aborting a transaction after a read conflict also lets us read data
+and go on about our business:
+    
+    >>> connection.invalidateCache()
+
+    >>> connection.root()['c'].x
+    Traceback (most recent call last):
+    ...
+    ReadConflictError: database read conflict error
+
+    >>> tm.abort()
+    >>> connection.root()['c'].x
+    1
+
+    >>> connection.root()['c'].x = 2
+    >>> tm.commit()
+
+    >>> db.close()
+    """
+
 # ---- stubs
 
 class StubObject(Persistent):

Modified: ZODB/branches/3.7/src/ZODB/tests/testDB.py
===================================================================
--- ZODB/branches/3.7/src/ZODB/tests/testDB.py	2006-08-15 21:36:17 UTC (rev 69548)
+++ ZODB/branches/3.7/src/ZODB/tests/testDB.py	2006-08-15 23:35:21 UTC (rev 69549)
@@ -18,6 +18,8 @@
 
 import transaction
 
+from zope.testing import doctest
+
 import ZODB
 import ZODB.FileStorage
 
@@ -130,5 +132,49 @@
         self.assertEqual(nconn(pools), 3)
 
 
+def test_invalidateCache():
+    """\
+
+The invalidateCache method invalidates a connection caches for all of the connections attached to a database.
+
+    >>> from ZODB.tests.util import DB
+    >>> import transaction
+    >>> db = DB()
+    >>> tm1 = transaction.TransactionManager()
+    >>> c1 = db.open(transaction_manager=tm1)
+    >>> c1.root()['a'] = MinPO(1)
+    >>> tm1.commit()
+    >>> tm2 = transaction.TransactionManager()
+    >>> c2 = db.open(transaction_manager=tm2)
+    >>> c1.root()['a']._p_deactivate()
+    >>> tm3 = transaction.TransactionManager()
+    >>> c3 = db.open(transaction_manager=tm3)
+    >>> c3.root()['a'].value
+    1
+    >>> c3.close()
+    >>> db.invalidateCache()
+    
+    >>> c1.root()['a'].value
+    Traceback (most recent call last):
+    ...
+    ReadConflictError: database read conflict error
+    
+    >>> c2.root()['a'].value
+    Traceback (most recent call last):
+    ...
+    ReadConflictError: database read conflict error
+
+    >>> c3 is db.open(transaction_manager=tm3)
+    True
+    >>> print c3.root()['a']._p_changed
+    None
+
+    >>> db.close()
+    """
+
+
+
 def test_suite():
-    return unittest.makeSuite(DBTests)
+    s = unittest.makeSuite(DBTests)
+    s.addTest(doctest.DocTestSuite())
+    return s



More information about the Zodb-checkins mailing list