[Zope-Checkins] CVS: ZODB3/ZEO - StorageServer.py:1.74.2.10 ClientStorage.py:1.73.2.13

Jeremy Hylton jeremy@zope.com
Tue, 29 Apr 2003 17:39:57 -0400


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv5133/ZEO

Modified Files:
      Tag: ZODB3-3_1-branch
	StorageServer.py ClientStorage.py 
Log Message:
Backport assorted ZEO fixes.

Make isReadOnly() report the right answer with a read-only fallback
connection.
Don't allow client to read cache data during verification.


=== ZODB3/ZEO/StorageServer.py 1.74.2.9 => 1.74.2.10 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.9	Tue Jan 28 13:11:22 2003
+++ ZODB3/ZEO/StorageServer.py	Tue Apr 29 17:39:56 2003
@@ -24,8 +24,8 @@
 import cPickle
 import os
 import sys
-import time
 import threading
+import time
 
 from ZEO import ClientStub
 from ZEO.CommitLog import CommitLog
@@ -383,6 +383,7 @@
                     self.client.invalidateVerify((oid, ''))
 
     def endZeoVerify(self):
+        log("received endZeoVerify")
         self.client.endVerify()
 
     def pack(self, time, wait=1):


=== ZODB3/ZEO/ClientStorage.py 1.73.2.12 => 1.73.2.13 ===
--- ZODB3/ZEO/ClientStorage.py:1.73.2.12	Wed Jan 29 14:39:24 2003
+++ ZODB3/ZEO/ClientStorage.py	Tue Apr 29 17:39:56 2003
@@ -201,13 +201,40 @@
             wait = 1
 
         self._addr = addr # For tests
+
+        # A ZEO client can run in disconnected mode, using data from
+        # its cache, or in connected mode.  Several instance variables
+        # are related to whether the client is connected.
+
+        # _server: All method calls are invoked through the server
+        #    stub.  When not connect, set to disconnected_stub an
+        #    object that raises ClientDisconnected errors.
+
+        # _ready: A threading Event that is set only if _server
+        #    is set to a real stub.
+
+        # _connection: The current zrpc connection or None.
+
+        # _connection is set as soon as a connection is established,
+        # but _server is set only after cache verification has finished
+        # and clients can safely use the server.  _pending_server holds
+        # a server stub while it is being verified.
+        
         self._server = disconnected_stub
+        self._connection = None
+        self._pending_server = None
+        self._ready = threading.Event()
+
+        # _is_read_only stores the constructor argument
         self._is_read_only = read_only
+        # _conn_is_read_only stores the status of the current connection
+        self._conn_is_read_only = 0
         self._storage = storage
         self._read_only_fallback = read_only_fallback
-        self._connection = None
         # _server_addr is used by sortKey()
         self._server_addr = None
+        self._tfile = None
+        self._pickler = None
 
         self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
                       'supportsUndo':0, 'supportsVersions': 0,
@@ -254,15 +281,47 @@
                                                     tmax=max_disconnect_poll)
 
         if wait:
-            self._rpc_mgr.connect(sync=1)
+            self._wait()
         else:
+            # attempt_connect() will make an attempt that doesn't block
+            # "too long," for a very vague notion of too long.  If that
+            # doesn't succeed, call connect() to start a thread.
             if not self._rpc_mgr.attempt_connect():
                 self._rpc_mgr.connect()
+            # If the connect hasn't occurred, run with cached data.
+            if not self._ready.isSet():
+                self._cache.open()
+
+    def _wait(self):
+        # Wait for a connection to be established.
+        self._rpc_mgr.connect(sync=1)
+        # When a synchronous connect() call returns, there is
+        # a valid _connection object but cache validation may
+        # still be going on.  This code must wait until validation
+        # finishes, but if the connection isn't a zrpc async
+        # connection it also needs to poll for input.
+        if self._connection.is_async():
+            while 1:
+                self._ready.wait(30)
+                if self._ready.isSet():
+                    break
+                log2(INFO, "Wait for cache verification to finish")
+        else:
+            self._wait_sync()
 
-        # If we're connected at this point, the cache is opened as a
-        # side effect of verify_cache().  If not, open it now.
-        if not self.is_connected():
-            self._cache.open()
+    def _wait_sync(self):
+        # If there is no mainloop running, this code needs
+        # to call poll() to cause asyncore to handle events.
+        while 1:
+            if self._ready.isSet():
+                break
+            log2(INFO, "Wait for cache verification to finish")
+            if self._connection is None:
+                # If the connection was closed while we were
+                # waiting for it to become ready, start over.
+                return self._wait()
+            else:
+                self._connection.pending(30)
 
     def close(self):
         """Storage API: finalize the storage, releasing external resources."""
@@ -295,7 +354,13 @@
 
         This is called by the sync method in ZODB.Connection.
         """
-        self._server._update()
+        # If there is no connection, return immediately.  Technically,
+        # there are no pending invalidations so they are all handled.
+        # There doesn't seem to be much benefit to raising an exception.
+        
+        cn = self._connection
+        if cn is not None:
+            cn.pending()
 
     def testConnection(self, conn):
         """Internal: test the given connection.
@@ -320,6 +385,7 @@
         """
         log2(INFO, "Testing connection %r" % conn)
         # XXX Check the protocol version here?
+        self._conn_is_read_only = 0
         stub = self.StorageServerStubClass(conn)
         try:
             stub.register(str(self._storage), self._is_read_only)
@@ -329,6 +395,7 @@
                 raise
             log2(INFO, "Got ReadOnlyError; trying again with read_only=1")
             stub.register(str(self._storage), read_only=1)
+            self._conn_is_read_only = 1
             return 0
 
     def notifyConnected(self, conn):
@@ -337,28 +404,37 @@
         This is called by ConnectionManager after it has decided which
         connection should be used.
         """
+        if self._cache is None:
+            # the storage was closed, but the connect thread called
+            # this method before it was stopped.
+            return
+
         # XXX would like to report whether we get a read-only connection
         if self._connection is not None:
             reconnect = 1
         else:
             reconnect = 0
-        addr = conn.get_addr()
-        self.set_server_addr(addr)
-        stub = self.StorageServerStubClass(conn)
-        self._oids = []
-        self._info.update(stub.get_info())
-        self.verify_cache(stub)
+        self.set_server_addr(conn.get_addr())
 
-        # XXX The stub should be saved here and set in endVerify() below.
+        # If we are upgrading from a read-only fallback connection,
+        # we must close the old connection to prevent it from being
+        # used while the cache is verified against the new connection.
         if self._connection is not None:
             self._connection.close()
         self._connection = conn
-        self._server = stub
 
         if reconnect:
-            log2(INFO, "Reconnected to storage: %s" % repr(addr))
+            log2(INFO, "Reconnected to storage: %s" % self._server_addr)
         else:
-            log2(INFO, "Connected to storage: %s" % repr(addr))
+            log2(INFO, "Connected to storage: %s" % self._server_addr)
+
+        stub = self.StorageServerStubClass(conn)
+        self._oids = []
+        self._info.update(stub.get_info())
+        self.verify_cache(stub)
+        if not conn.is_async():
+            log2(INFO, "Waiting for cache verification to finish")
+            self._wait_sync()
 
     def set_server_addr(self, addr):
         # Normalize server address and convert to string
@@ -389,10 +465,15 @@
 
     def verify_cache(self, server):
         """Internal routine called to verify the cache."""
-        # XXX beginZeoVerify ends up calling back to beginVerify() below.
-        # That whole exchange is rather unnecessary.
-        server.beginZeoVerify()
+        log2(INFO, "Verifying cache")
+        # setup tempfile to hold zeoVerify results
+        self._tfile = tempfile.TemporaryFile(suffix=".inv")
+        self._pickler = cPickle.Pickler(self._tfile, 1)
+        self._pickler.fast = 1 # Don't use the memo
+
         self._cache.verify(server.zeoVerify)
+        self._pending_server = server
+        log2(INFO, "Calling endZeoVerify on server")
         server.endZeoVerify()
 
     ### Is there a race condition between notifyConnected and
@@ -412,6 +493,7 @@
         log2(PROBLEM, "Disconnected from storage: %s"
              % repr(self._server_addr))
         self._connection = None
+        self._ready.clear()
         self._server = disconnected_stub
 
     def __len__(self):
@@ -454,7 +536,13 @@
         XXX In read-only fallback mode, this returns false, even if we
         are currently connected to a read-only server.
         """
-        return self._is_read_only
+        if self._is_read_only:
+            return 1
+        else:
+            # If the client is configured for a read-write connection
+            # but has a read-only fallback connection, _conn_is_read_only
+            # will be True.
+            return self._conn_is_read_only
 
     def _check_trans(self, trans):
         """Internal helper to check a transaction argument for sanity."""
@@ -780,14 +868,6 @@
         """Server callback to update the info dictionary."""
         self._info.update(dict)
 
-    def beginVerify(self):
-        """Server callback to signal start of cache validation."""
-        log2(INFO, "begin cache verification")
-        self._verify_start = time.time()
-        self._tfile = tempfile.TemporaryFile(suffix=".inv")
-        self._pickler = cPickle.Pickler(self._tfile, 1)
-        self._pickler.fast = 1 # Don't use the memo
-
     def invalidateVerify(self, args):
         """Server callback to invalidate an (oid, version) pair.
 
@@ -800,31 +880,33 @@
             return
         self._pickler.dump(args)
 
+    def _process_invalidations(self, invs):
+        # Invalidations are sent by the ZEO server as a sequence of
+        # oid, version pairs.  The DB's invalidate() method expects a
+        # dictionary of oids.
+
+        for oid, version in invs:
+            self._cache.invalidate(oid, version=version)
+            self._db.invalidate(oid, version=version)
+
     def endVerify(self):
         """Server callback to signal end of cache validation."""
         if self._pickler is None:
-            # XXX This should never happen
             return
-        self._pickler.dump((0,0))
+        # write end-of-data marker
+        self._pickler.dump((None, None))
+        self._pickler = None
         self._tfile.seek(0)
-        unpick = cPickle.Unpickler(self._tfile)
         f = self._tfile
         self._tfile = None
-
-        ninval = 0
-
-        while 1:
-            oid, version = unpick.load()
-            if not oid:
-                break
-            ninval += 1
-            self._cache.invalidate(oid, version=version)
-            self._db.invalidate(oid, version=version)
+        self._process_invalidations(InvalidationLogIterator(f))
         f.close()
 
-        elapsed = time.time() - self._verify_start
-        log2(INFO, "end cache verification (%d invalidations, %.3g seconds)" %
-             (ninval, elapsed))
+        log2(INFO, "endVerify finishing")
+        self._server = self._pending_server
+        self._ready.set()
+        self._pending_conn = None
+        log2(INFO, "endVerify finished")
 
     def invalidateTrans(self, args):
         """Server callback to invalidate a list of (oid, version) pairs.
@@ -848,7 +930,40 @@
     # don't want that.  So here we alias the old names to their new
     # implementations.
 
-    begin = beginVerify
     invalidate = invalidateVerify
     end = endVerify
     Invalidate = invalidateTrans
+
+try:
+    StopIteration
+except NameError:
+    class StopIteration(Exception):
+        pass
+
+class InvalidationLogIterator:
+    """Helper class for reading invalidations in endVerify."""
+
+    def __init__(self, fileobj):
+        self._unpickler = cPickle.Unpickler(fileobj)
+        self.getitem_i = 0
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        oid, version = self._unpickler.load()
+        if oid is None:
+            raise StopIteration
+        return oid, version
+
+    # The __getitem__() method is needed to support iteration
+    # in Python 2.1.
+
+    def __getitem__(self, i):
+        assert i == self.getitem_i
+        try:
+            obj = self.next()
+        except StopIteration:
+            raise IndexError, i
+        self.getitem_i += 1
+        return obj