[Checkins] SVN: ZODB/branches/jim-3.8-connection/src/ZEO/ClientStorage.py Fixed seevral invalidation-during connection bugs that could cause

Jim Fulton jim at zope.com
Fri Jul 11 17:41:42 EDT 2008


Log message for revision 88271:
  Fixed seevral invalidation-during connection bugs that could cause
  cache corruption or inconsistencies.
  

Changed:
  U   ZODB/branches/jim-3.8-connection/src/ZEO/ClientStorage.py

-=-
Modified: ZODB/branches/jim-3.8-connection/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-3.8-connection/src/ZEO/ClientStorage.py	2008-07-11 21:41:34 UTC (rev 88270)
+++ ZODB/branches/jim-3.8-connection/src/ZEO/ClientStorage.py	2008-07-11 21:41:40 UTC (rev 88271)
@@ -491,6 +491,7 @@
             # If we are upgrading from a read-only fallback connection,
             # we must close the old connection to prevent it from being
             # used while the cache is verified against the new connection.
+            self._connection.register_object(None) # Don't call me!
             self._connection.close()
             self._connection = None
             self._ready.clear()
@@ -558,54 +559,6 @@
         else:
             return '%s:%s' % (self._storage, self._server_addr)
 
-    def verify_cache(self, server):
-        """Internal routine called to verify the cache.
-
-        The return value (indicating which path we took) is used by
-        the test suite.
-        """
-
-        # If verify_cache() finishes the cache verification process,
-        # it should set self._server.  If it goes through full cache
-        # verification, then endVerify() should self._server.
-
-        last_inval_tid = self._cache.getLastTid()
-        if last_inval_tid is not None:
-            ltid = server.lastTransaction()
-            if ltid == last_inval_tid:
-                log2("No verification necessary (last_inval_tid up-to-date)")
-                self._server = server
-                self._ready.set()
-                return "no verification"
-
-            # log some hints about last transaction
-            log2("last inval tid: %r %s\n"
-                 % (last_inval_tid, tid2time(last_inval_tid)))
-            log2("last transaction: %r %s" %
-                 (ltid, ltid and tid2time(ltid)))
-
-            pair = server.getInvalidations(last_inval_tid)
-            if pair is not None:
-                log2("Recovering %d invalidations" % len(pair[1]))
-                self.invalidateTransaction(*pair)
-                self._server = server
-                self._ready.set()
-                return "quick verification"
-
-        log2("Verifying cache")
-        # setup tempfile to hold zeoVerify results
-        self._tfile = tempfile.TemporaryFile(suffix=".inv")
-        self._pickler = cPickle.Pickler(self._tfile, 1)
-        self._pickler.fast = 1 # Don't use the memo
-
-        # TODO:  should batch these operations for efficiency; would need
-        # to acquire lock ...
-        for oid, tid, version in self._cache.contents():
-            server.verify(oid, version, tid)
-        self._pending_server = server
-        server.endZeoVerify()
-        return "full verification"
-
     ### Is there a race condition between notifyConnected and
     ### notifyDisconnected? In Particular, what if we get
     ### notifyDisconnected in the middle of notifyConnected?
@@ -1224,8 +1177,6 @@
         """Storage API: return a sequence of versions in the storage."""
         return self._server.versions(max)
 
-    # Below are methods invoked by the StorageServer
-
     def serialnos(self, args):
         """Server callback to pass a list of changed (oid, serial) pairs."""
         self._serials.extend(args)
@@ -1234,6 +1185,57 @@
         """Server callback to update the info dictionary."""
         self._info.update(dict)
 
+    def verify_cache(self, server):
+        """Internal routine called to verify the cache.
+
+        The return value (indicating which path we took) is used by
+        the test suite.
+        """
+
+        self._pending_server = server
+
+        # setup tempfile to hold zeoVerify results and interim
+        # invalidation results
+        self._tfile = tempfile.TemporaryFile(suffix=".inv")
+        self._pickler = cPickle.Pickler(self._tfile, 1)
+        self._pickler.fast = 1 # Don't use the memo
+
+        # allow incoming invalidations:
+        self._connection.register_object(self)
+
+        # If verify_cache() finishes the cache verification process,
+        # it should set self._server.  If it goes through full cache
+        # verification, then endVerify() should self._server.
+
+        last_inval_tid = self._cache.getLastTid()
+        if last_inval_tid is not None:
+            ltid = server.lastTransaction()
+            if ltid == last_inval_tid:
+                log2("No verification necessary (last_inval_tid up-to-date)")
+                self.finish_verification()
+                return "no verification"
+
+            # log some hints about last transaction
+            log2("last inval tid: %r %s\n"
+                 % (last_inval_tid, tid2time(last_inval_tid)))
+            log2("last transaction: %r %s" %
+                 (ltid, ltid and tid2time(ltid)))
+
+            pair = server.getInvalidations(last_inval_tid)
+            if pair is not None:
+                log2("Recovering %d invalidations" % len(pair[1]))
+                self.finish_verification(pair)
+                return "quick verification"
+
+        log2("Verifying cache")
+
+        # TODO:  should batch these operations for efficiency; would need
+        # to acquire lock ...
+        for oid, tid, version in self._cache.contents():
+            server.verify(oid, version, tid)
+        server.endZeoVerify()
+        return "full verification"
+
     def invalidateVerify(self, args):
         """Server callback to invalidate an (oid, version) pair.
 
@@ -1245,68 +1247,93 @@
             # This should never happen.  TODO:  assert it doesn't, or log
             # if it does.
             return
-        self._pickler.dump(args)
+        oid, version = args
+        self._pickler.dump((oid, version, None))
 
-    def _process_invalidations(self, invs):
-        # Invalidations are sent by the ZEO server as a sequence of
-        # oid, version pairs.  The DB's invalidate() method expects a
-        # dictionary of oids.
+    def endVerify(self):
+        """Server callback to signal end of cache validation."""
 
+        log2("endVerify finishing")
+        self.finish_verification()
+        log2("endVerify finished")
+
+    def finish_verification(self, catch_up=None):
         self._lock.acquire()
         try:
-            # versions maps version names to dictionary of invalidations
-            versions = {}
-            for oid, version, tid in invs:
-                if oid == self._load_oid:
-                    self._load_status = 0
-                self._cache.invalidate(oid, version, tid)
-                oids = versions.get((version, tid))
-                if not oids:
-                    versions[(version, tid)] = [oid]
-                else:
-                    oids.append(oid)
+            if catch_up:
+                # process catch-up invalidations
+                tid, invalidations = catch_up
+                self._process_invalidations(
+                    (oid, version, tid)
+                    for oid, version in invalidations
+                    )
+            
+            if self._pickler is None:
+                return
+            # write end-of-data marker
+            self._pickler.dump((None, None, None))
+            self._pickler = None
+            self._tfile.seek(0)
+            unpickler = cPickle.Unpickler(self._tfile)
+            min_tid = self._cache.getLastTid()
+            def InvalidationLogIterator():
+                while 1:
+                    oid, version, tid = unpickler.load()
+                    if oid is None:
+                        break
+                    if ((tid is None)
+                        or (min_tid is None)
+                        or (tid > min_tid)
+                        ):
+                        yield oid, version, tid
 
-            if self._db is not None:
-                for (version, tid), d in versions.items():
-                    self._db.invalidate(tid, d, version=version)
+            self._process_invalidations(InvalidationLogIterator())
+            self._tfile.close()
+            self._tfile = None
         finally:
             self._lock.release()
 
-    def endVerify(self):
-        """Server callback to signal end of cache validation."""
-        if self._pickler is None:
-            return
-        # write end-of-data marker
-        self._pickler.dump((None, None))
-        self._pickler = None
-        self._tfile.seek(0)
-        f = self._tfile
-        self._tfile = None
-        self._process_invalidations(InvalidationLogIterator(f))
-        f.close()
-
-        log2("endVerify finishing")
         self._server = self._pending_server
         self._ready.set()
-        self._pending_conn = None
-        log2("endVerify finished")
+        self._pending_server = None
 
+
     def invalidateTransaction(self, tid, args):
-        """Invalidate objects modified by tid."""
+        """Server callback: Invalidate objects modified by tid."""
         self._lock.acquire()
         try:
-            self._cache.setLastTid(tid)
+            if self._pickler is not None:
+                log2("Transactional invalidation during cache verification",
+                     level=BLATHER)
+                for oid, version in args:
+                    self._pickler.dump((oid, version, tid))
+                return
+            self._process_invalidations([(oid, version, tid)
+                                         for oid, version in args])
         finally:
             self._lock.release()
-        if self._pickler is not None:
-            log2("Transactional invalidation during cache verification",
-                 level=BLATHER)
-            for t in args:
-                self._pickler.dump(t)
-            return
-        self._process_invalidations([(oid, version, tid)
-                                     for oid, version in args])
 
+    def _process_invalidations(self, invs):
+        # Invalidations are sent by the ZEO server as a sequence of
+        # oid, version, tid triples.  The DB's invalidate() method expects a
+        # dictionary of oids.
+
+        # versions maps version names to dictionary of invalidations
+        versions = {}
+        for oid, version, tid in invs:
+            if oid == self._load_oid:
+                self._load_status = 0
+            self._cache.invalidate(oid, version, tid)
+            oids = versions.get((version, tid))
+            if not oids:
+                versions[(version, tid)] = [oid]
+            else:
+                oids.append(oid)
+
+        if self._db is not None:
+            for (version, tid), d in versions.items():
+                self._db.invalidate(tid, d, version=version)
+
     # The following are for compatibility with protocol version 2.0.0
 
     def invalidateTrans(self, args):
@@ -1315,11 +1342,3 @@
     invalidate = invalidateVerify
     end = endVerify
     Invalidate = invalidateTrans
-
-def InvalidationLogIterator(fileobj):
-    unpickler = cPickle.Unpickler(fileobj)
-    while 1:
-        oid, version = unpickler.load()
-        if oid is None:
-            break
-        yield oid, version, None



More information about the Checkins mailing list