[Zodb-checkins] SVN: ZODB/branches/ctheune-bushy-directory-3.8/ Removed merge of changes from 3.8 branch.

Jim Fulton jim at zope.com
Wed Aug 27 05:47:18 EDT 2008


Log message for revision 90416:
  Removed merge of changes from 3.8 branch.
  

Changed:
  U   ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py
  D   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt
  U   ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py

-=-
Modified: ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt	2008-08-27 09:47:18 UTC (rev 90416)
@@ -4,15 +4,6 @@
 
 Bugs Fixed:
 
-- (???) Fixed bug #251037: Made packing of blob storages non-blocking.
-
-- (beta 6) Fixed a bug that could cause InvalidObjectReference errors
-  for objects that were explicitly added to a database if the object
-  was modified after a savepoint that added the object.
-
-- (beta 5) Fixed several bugs that caused ZEO cache corruption when connecting
-  to servers. These bugs affected both persistent and non-persistent caches. 
-
 - (beta 5) Improved the the ZEO client shutdown support to try to
   avoid spurious errors on exit, especially for scripts, such as zeopack.
 

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -251,6 +251,8 @@
 
         # _is_read_only stores the constructor argument
         self._is_read_only = read_only
+        # _conn_is_read_only stores the status of the current connection
+        self._conn_is_read_only = 0
         self._storage = storage
         self._read_only_fallback = read_only_fallback
         self._username = username
@@ -338,6 +340,8 @@
         else:
             cache_path = None
         self._cache = self.ClientCacheClass(cache_path, size=cache_size)
+        # TODO:  maybe there's a better time to open the cache?  Unclear.
+        self._cache.open()
 
         self._rpc_mgr = self.ConnectionManagerClass(addr, self,
                                                     tmin=min_disconnect_poll,
@@ -378,18 +382,13 @@
 
     def close(self):
         """Storage API: finalize the storage, releasing external resources."""
-        if self._rpc_mgr is not None:
-            self._rpc_mgr.close()
-            self._rpc_mgr = None
-        if self._connection is not None:
-            self._connection.register_object(None) # Don't call me!
-            self._connection.close()
-            self._connection = None
-
         self._tbuf.close()
         if self._cache is not None:
             self._cache.close()
             self._cache = None
+        if self._rpc_mgr is not None:
+            self._rpc_mgr.close()
+            self._rpc_mgr = None
 
     def registerDB(self, db):
         """Storage API: register a database for invalidation messages.
@@ -455,7 +454,7 @@
         """
         log2("Testing connection %r" % conn)
         # TODO:  Should we check the protocol version here?
-        conn._is_read_only = self._is_read_only
+        self._conn_is_read_only = 0
         stub = self.StorageServerStubClass(conn)
 
         auth = stub.getAuthProtocol()
@@ -477,7 +476,7 @@
                 raise
             log2("Got ReadOnlyError; trying again with read_only=1")
             stub.register(str(self._storage), read_only=1)
-            conn._is_read_only = True
+            self._conn_is_read_only = 1
             return 0
 
     def notifyConnected(self, conn):
@@ -491,26 +490,24 @@
             # this method before it was stopped.
             return
 
+        # invalidate our db cache
+        if self._db is not None:
+            self._db.invalidateCache()
 
+        # TODO:  report whether we get a read-only connection.
         if self._connection is not None:
-            # If we are upgrading from a read-only fallback connection,
-            # we must close the old connection to prevent it from being
-            # used while the cache is verified against the new connection.
-            self._connection.register_object(None) # Don't call me!
-            self._connection.close()
-            self._connection = None
-            self._ready.clear()
             reconnect = 1
         else:
             reconnect = 0
-
         self.set_server_addr(conn.get_addr())
+
+        # If we are upgrading from a read-only fallback connection,
+        # we must close the old connection to prevent it from being
+        # used while the cache is verified against the new connection.
+        if self._connection is not None:
+            self._connection.close()
         self._connection = conn
 
-        # invalidate our db cache
-        if self._db is not None:
-            self._db.invalidateCache()
-
         if reconnect:
             log2("Reconnected to storage: %s" % self._server_addr)
         else:
@@ -564,6 +561,54 @@
         else:
             return '%s:%s' % (self._storage, self._server_addr)
 
+    def verify_cache(self, server):
+        """Internal routine called to verify the cache.
+
+        The return value (indicating which path we took) is used by
+        the test suite.
+        """
+
+        # If verify_cache() finishes the cache verification process,
+        # it should set self._server.  If it goes through full cache
+        # verification, then endVerify() should self._server.
+
+        last_inval_tid = self._cache.getLastTid()
+        if last_inval_tid is not None:
+            ltid = server.lastTransaction()
+            if ltid == last_inval_tid:
+                log2("No verification necessary (last_inval_tid up-to-date)")
+                self._server = server
+                self._ready.set()
+                return "no verification"
+
+            # log some hints about last transaction
+            log2("last inval tid: %r %s\n"
+                 % (last_inval_tid, tid2time(last_inval_tid)))
+            log2("last transaction: %r %s" %
+                 (ltid, ltid and tid2time(ltid)))
+
+            pair = server.getInvalidations(last_inval_tid)
+            if pair is not None:
+                log2("Recovering %d invalidations" % len(pair[1]))
+                self.invalidateTransaction(*pair)
+                self._server = server
+                self._ready.set()
+                return "quick verification"
+
+        log2("Verifying cache")
+        # setup tempfile to hold zeoVerify results
+        self._tfile = tempfile.TemporaryFile(suffix=".inv")
+        self._pickler = cPickle.Pickler(self._tfile, 1)
+        self._pickler.fast = 1 # Don't use the memo
+
+        # TODO:  should batch these operations for efficiency; would need
+        # to acquire lock ...
+        for oid, tid, version in self._cache.contents():
+            server.verify(oid, version, tid)
+        self._pending_server = server
+        server.endZeoVerify()
+        return "full verification"
+
     ### Is there a race condition between notifyConnected and
     ### notifyDisconnected? In Particular, what if we get
     ### notifyDisconnected in the middle of notifyConnected?
@@ -629,16 +674,12 @@
     def isReadOnly(self):
         """Storage API: return whether we are in read-only mode."""
         if self._is_read_only:
-            return True
+            return 1
         else:
             # If the client is configured for a read-write connection
-            # but has a read-only fallback connection, conn._is_read_only
-            # will be True.  If self._connection is None, we'll behave as
-            # read_only
-            try:
-                return self._connection._is_read_only
-            except AttributeError:
-                return True
+            # but has a read-only fallback connection, _conn_is_read_only
+            # will be True.
+            return self._conn_is_read_only
 
     def _check_trans(self, trans):
         """Internal helper to check a transaction argument for sanity."""
@@ -1111,7 +1152,7 @@
             return
 
         for oid, version, data in self._tbuf:
-            self._cache.invalidate(oid, version, tid, False)
+            self._cache.invalidate(oid, version, tid)
             # If data is None, we just invalidate.
             if data is not None:
                 s = self._seriald[oid]
@@ -1169,6 +1210,8 @@
         """Storage API: return a sequence of versions in the storage."""
         return self._server.versions(max)
 
+    # Below are methods invoked by the StorageServer
+
     def serialnos(self, args):
         """Server callback to pass a list of changed (oid, serial) pairs."""
         self._serials.extend(args)
@@ -1177,57 +1220,6 @@
         """Server callback to update the info dictionary."""
         self._info.update(dict)
 
-    def verify_cache(self, server):
-        """Internal routine called to verify the cache.
-
-        The return value (indicating which path we took) is used by
-        the test suite.
-        """
-
-        self._pending_server = server
-
-        # setup tempfile to hold zeoVerify results and interim
-        # invalidation results
-        self._tfile = tempfile.TemporaryFile(suffix=".inv")
-        self._pickler = cPickle.Pickler(self._tfile, 1)
-        self._pickler.fast = 1 # Don't use the memo
-
-        # allow incoming invalidations:
-        self._connection.register_object(self)
-
-        # If verify_cache() finishes the cache verification process,
-        # it should set self._server.  If it goes through full cache
-        # verification, then endVerify() should self._server.
-
-        last_inval_tid = self._cache.getLastTid()
-        if last_inval_tid is not None:
-            ltid = server.lastTransaction()
-            if ltid == last_inval_tid:
-                log2("No verification necessary (last_inval_tid up-to-date)")
-                self.finish_verification()
-                return "no verification"
-
-            # log some hints about last transaction
-            log2("last inval tid: %r %s\n"
-                 % (last_inval_tid, tid2time(last_inval_tid)))
-            log2("last transaction: %r %s" %
-                 (ltid, ltid and tid2time(ltid)))
-
-            pair = server.getInvalidations(last_inval_tid)
-            if pair is not None:
-                log2("Recovering %d invalidations" % len(pair[1]))
-                self.finish_verification(pair)
-                return "quick verification"
-
-        log2("Verifying cache")
-
-        # TODO:  should batch these operations for efficiency; would need
-        # to acquire lock ...
-        for oid, tid, version in self._cache.contents():
-            server.verify(oid, version, tid)
-        server.endZeoVerify()
-        return "full verification"
-
     def invalidateVerify(self, args):
         """Server callback to invalidate an (oid, version) pair.
 
@@ -1239,93 +1231,68 @@
             # This should never happen.  TODO:  assert it doesn't, or log
             # if it does.
             return
-        oid, version = args
-        self._pickler.dump((oid, version, None))
+        self._pickler.dump(args)
 
-    def endVerify(self):
-        """Server callback to signal end of cache validation."""
+    def _process_invalidations(self, invs):
+        # Invalidations are sent by the ZEO server as a sequence of
+        # oid, version pairs.  The DB's invalidate() method expects a
+        # dictionary of oids.
 
-        log2("endVerify finishing")
-        self.finish_verification()
-        log2("endVerify finished")
-
-    def finish_verification(self, catch_up=None):
         self._lock.acquire()
         try:
-            if catch_up:
-                # process catch-up invalidations
-                tid, invalidations = catch_up
-                self._process_invalidations(
-                    (oid, version, tid)
-                    for oid, version in invalidations
-                    )
-            
-            if self._pickler is None:
-                return
-            # write end-of-data marker
-            self._pickler.dump((None, None, None))
-            self._pickler = None
-            self._tfile.seek(0)
-            unpickler = cPickle.Unpickler(self._tfile)
-            min_tid = self._cache.getLastTid()
-            def InvalidationLogIterator():
-                while 1:
-                    oid, version, tid = unpickler.load()
-                    if oid is None:
-                        break
-                    if ((tid is None)
-                        or (min_tid is None)
-                        or (tid > min_tid)
-                        ):
-                        yield oid, version, tid
+            # versions maps version names to dictionary of invalidations
+            versions = {}
+            for oid, version, tid in invs:
+                if oid == self._load_oid:
+                    self._load_status = 0
+                self._cache.invalidate(oid, version, tid)
+                oids = versions.get((version, tid))
+                if not oids:
+                    versions[(version, tid)] = [oid]
+                else:
+                    oids.append(oid)
 
-            self._process_invalidations(InvalidationLogIterator())
-            self._tfile.close()
-            self._tfile = None
+            if self._db is not None:
+                for (version, tid), d in versions.items():
+                    self._db.invalidate(tid, d, version=version)
         finally:
             self._lock.release()
 
+    def endVerify(self):
+        """Server callback to signal end of cache validation."""
+        if self._pickler is None:
+            return
+        # write end-of-data marker
+        self._pickler.dump((None, None))
+        self._pickler = None
+        self._tfile.seek(0)
+        f = self._tfile
+        self._tfile = None
+        self._process_invalidations(InvalidationLogIterator(f))
+        f.close()
+
+        log2("endVerify finishing")
         self._server = self._pending_server
         self._ready.set()
-        self._pending_server = None
+        self._pending_conn = None
+        log2("endVerify finished")
 
-
     def invalidateTransaction(self, tid, args):
-        """Server callback: Invalidate objects modified by tid."""
+        """Invalidate objects modified by tid."""
         self._lock.acquire()
         try:
-            if self._pickler is not None:
-                log2("Transactional invalidation during cache verification",
-                     level=BLATHER)
-                for oid, version in args:
-                    self._pickler.dump((oid, version, tid))
-                return
-            self._process_invalidations([(oid, version, tid)
-                                         for oid, version in args])
+            self._cache.setLastTid(tid)
         finally:
             self._lock.release()
+        if self._pickler is not None:
+            log2("Transactional invalidation during cache verification",
+                 level=BLATHER)
+            for t in args:
+                self._pickler.dump(t)
+            return
+        self._process_invalidations([(oid, version, tid)
+                                     for oid, version in args])
 
-    def _process_invalidations(self, invs):
-        # Invalidations are sent by the ZEO server as a sequence of
-        # oid, version, tid triples.  The DB's invalidate() method expects a
-        # dictionary of oids.
-
-        # versions maps version names to dictionary of invalidations
-        versions = {}
-        for oid, version, tid in invs:
-            if oid == self._load_oid:
-                self._load_status = 0
-            self._cache.invalidate(oid, version, tid)
-            oids = versions.get((version, tid))
-            if not oids:
-                versions[(version, tid)] = [oid]
-            else:
-                oids.append(oid)
-
-        if self._db is not None:
-            for (version, tid), d in versions.items():
-                self._db.invalidate(tid, d, version=version)
-
     # The following are for compatibility with protocol version 2.0.0
 
     def invalidateTrans(self, args):
@@ -1334,3 +1301,11 @@
     invalidate = invalidateVerify
     end = endVerify
     Invalidate = invalidateTrans
+
+def InvalidationLogIterator(fileobj):
+    unpickler = cPickle.Unpickler(fileobj)
+    while 1:
+        oid, version = unpickler.load()
+        if oid is None:
+            break
+        yield oid, version, None

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -30,7 +30,6 @@
 import logging
 import os
 import tempfile
-import threading
 import time
 
 import ZODB.fsIndex
@@ -122,22 +121,7 @@
 # to the end of the file that the new object can't fit in one
 # contiguous chunk, currentofs is reset to ZEC3_HEADER_SIZE first.
 
-class locked(object):
 
-    def __init__(self, func):
-        self.func = func
-
-    def __get__(self, inst, class_):
-        if inst is None:
-            return self
-        def call(*args, **kw):
-            inst._lock.acquire()
-            try:
-                return self.func(inst, *args, **kw)
-            finally:
-                inst._lock.release()
-        return call
-
 class ClientCache(object):
     """A simple in-memory cache."""
 
@@ -216,10 +200,6 @@
 
         self._setup_trace(path)
 
-        self.open()
-
-        self._lock = threading.RLock()
-
     # Backward compatibility. Client code used to have to use the fc
     # attr to get to the file cache to get cache stats.
     @property
@@ -373,7 +353,6 @@
     # instance, and also written out near the start of the cache file.  The
     # new tid must be strictly greater than our current idea of the most
     # recent tid.
-    @locked
     def setLastTid(self, tid):
         if self.tid is not None and tid <= self.tid:
             raise ValueError("new last tid (%s) must be greater than "
@@ -390,11 +369,10 @@
     # @return a transaction id
     # @defreturn string, or None if no transaction is yet known
     def getLastTid(self):
-        tid = self.tid
-        if tid == z64:
+        if self.tid == z64:
             return None
         else:
-            return tid
+            return self.tid
 
     ##
     # Return the current data record for oid and version.
@@ -404,7 +382,6 @@
     #         in the cache
     # @defreturn 3-tuple: (string, string, string)
 
-    @locked
     def load(self, oid, version=""):
         ofs = self.current.get(oid)
         if ofs is None:
@@ -437,7 +414,6 @@
     # @return data record, serial number, start tid, and end tid
     # @defreturn 4-tuple: (string, string, string, string)
 
-    @locked
     def loadBefore(self, oid, before_tid):
         noncurrent_for_oid = self.noncurrent.get(u64(oid))
         if noncurrent_for_oid is None:
@@ -479,7 +455,6 @@
     # @defreturn string or None
 
     # XXX This approac is wrong, but who cares
-    @locked
     def modifiedInVersion(self, oid):
         ofs = self.current.get(oid)
         if ofs is None:
@@ -507,7 +482,6 @@
     # @param data the actual data
     # @exception ValueError tried to store non-current version data
 
-    @locked
     def store(self, oid, version, start_tid, end_tid, data):
         # It's hard for the client to avoid storing the same object
         # more than once.  One case is when the client requests
@@ -607,30 +581,14 @@
     # data for `oid`, stop believing we have current data, and mark the
     # data we had as being valid only up to `tid`.  In all other cases, do
     # nothing.
-    #
-    # Paramters:
-    #
-    # - oid object id
-    # - version name of version to invalidate.
-    # - tid the id of the transaction that wrote a new revision of oid,
+    # @param oid object id
+    # @param version name of version to invalidate.
+    # @param tid the id of the transaction that wrote a new revision of oid,
     #        or None to forget all cached info about oid (version, current
     #        revision, and non-current revisions)
-    # - server_invalidation, a flag indicating whether the
-    #       invalidation has come from the server. It's possible, due
-    #       to threading issues, that when applying a local
-    #       invalidation after a store, that later invalidations from
-    #       the server may already have arrived.
-    
-    @locked
-    def invalidate(self, oid, version, tid, server_invalidation=True):
-        if tid is not None:
-            if tid > self.tid:
-                self.setLastTid(tid)
-            elif tid < self.tid:
-                if server_invalidation:
-                    raise ValueError("invalidation tid (%s) must not be less"
-                                     " than previous one (%s)" %
-                                     (u64(tid), u64(self.tid)))
+    def invalidate(self, oid, version, tid):
+        if tid > self.tid and tid is not None:
+            self.setLastTid(tid)
 
         ofs = self.current.get(oid)
         if ofs is None:
@@ -672,25 +630,17 @@
         seek = self.f.seek
         read = self.f.read
         for oid, ofs in self.current.iteritems():
+            seek(ofs)
+            assert read(1) == 'a', (ofs, self.f.tell(), oid)
+            size, saved_oid, tid, end_tid, lver = unpack(">I8s8s8sh", read(30))
+            assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
+            assert end_tid == z64, (ofs, self.f.tell(), oid)
+            if lver:
+                version = read(lver)
+            else:
+                version = ''
+            yield oid, tid, version
 
-            self._lock.acquire()
-            try:
-                seek(ofs)
-                assert read(1) == 'a', (ofs, self.f.tell(), oid)
-                size, saved_oid, tid, end_tid, lver = unpack(
-                    ">I8s8s8sh", read(30))
-                assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
-                assert end_tid == z64, (ofs, self.f.tell(), oid)
-                if lver:
-                    version = read(lver)
-                else:
-                    version = ''
-                result = oid, tid, version
-            finally:
-                self._lock.release()
-
-            yield result
-
     def dump(self):
         from ZODB.utils import oid_repr
         print "cache size", len(self)

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -158,7 +158,8 @@
         self.addr.append(self._getAddr())
 
     def _getAddr(self):
-        return 'localhost', forker.get_port()
+        # port+1 is also used, so only draw even port numbers
+        return 'localhost', random.randrange(25000, 30000, 2)
 
     def getConfig(self, path, create, read_only):
         raise NotImplementedError

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -144,25 +144,24 @@
         self.commitdict = commitdict
 
     def _testrun(self):
-        tm = transaction.TransactionManager()
-        cn = self.db.open(transaction_manager=tm)
+        cn = self.db.open()
         while not self.stop.isSet():
             try:
                 tree = cn.root()["tree"]
                 break
             except (ConflictError, KeyError):
-                tm.abort()
+                transaction.abort()
         key = self.startnum
         while not self.stop.isSet():
             try:
                 tree[key] = self.threadnum
-                tm.get().note("add key %s" % key)
-                tm.commit()
+                transaction.get().note("add key %s" % key)
+                transaction.commit()
                 self.commitdict[self] = 1
                 if self.sleep:
                     time.sleep(self.sleep)
             except (ReadConflictError, ConflictError), msg:
-                tm.abort()
+                transaction.abort()
             else:
                 self.added_keys.append(key)
             key += self.step
@@ -339,23 +338,16 @@
     def _check_threads(self, tree, *threads):
         # Make sure the thread's view of the world is consistent with
         # the actual database state.
-
         expected_keys = []
+        errormsgs = []
+        err = errormsgs.append
         for t in threads:
             if not t.added_keys:
                 err("thread %d didn't add any keys" % t.threadnum)
             expected_keys.extend(t.added_keys)
         expected_keys.sort()
-
-        for i in range(100):
-            tree._p_jar.sync()
-            actual_keys = list(tree.keys())
-            if expected_keys == actual_keys:
-                break
-            time.sleep(.1)
-        else:
-            errormsgs = []
-            err = errormsgs.append
+        actual_keys = list(tree.keys())
+        if expected_keys != actual_keys:
             err("expected keys != actual keys")
             for k in expected_keys:
                 if k not in actual_keys:
@@ -363,7 +355,8 @@
             for k in actual_keys:
                 if k not in expected_keys:
                     err("key %s in tree but not expected" % k)
-
+        if errormsgs:
+            display(tree)
             self.fail('\n'.join(errormsgs))
 
     def go(self, stop, commitdict, *threads):
@@ -495,9 +488,10 @@
         self.go(stop, cd, t1, t2, t3)
 
         while db1.lastTransaction() != db2.lastTransaction():
-            time.sleep(.1)
+            db1._storage.sync()
+            db2._storage.sync()
 
-        time.sleep(.1)
+
         cn = db1.open()
         tree = cn.root()["tree"]
         self._check_tree(cn, tree)

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -14,7 +14,6 @@
 """Library for forking storage server and connecting client storage"""
 
 import os
-import random
 import sys
 import time
 import errno
@@ -202,29 +201,3 @@
             ack = 'no ack received'
         logger.debug('shutdown_zeo_server(): acked: %s' % ack)
         s.close()
-
-def get_port():
-    """Return a port that is not in use.
-
-    Checks if a port is in use by trying to connect to it.  Assumes it
-    is not in use if connect raises an exception. We actually look for
-    2 consective free ports because most of the clients of this
-    function will use the returned port and the next one.
-
-    Raises RuntimeError after 10 tries.
-    """
-    for i in range(10):
-        port = random.randrange(20000, 30000)
-        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        try:
-            try:
-                s.connect(('localhost', port))
-                s1.connect(('localhost', port+1))
-            except socket.error:
-                # Perhaps we should check value of error too.
-                return port
-        finally:
-            s.close()
-            s1.close()
-    raise RuntimeError("Can't find port")

Deleted: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test	2008-08-27 09:47:18 UTC (rev 90416)
@@ -1,104 +0,0 @@
-Invalidations while connecting
-==============================
-
-As soon as a client registers with a server, it will recieve
-invalidations from the server.  The client must be careful to queue
-these invalidations until it is ready to deal with them.  At the time
-of the writing of this test, clients weren't careful enogh about
-queing invalidations.  This led to cache corruption in the form of
-both low-level file corruption as well as out-of-date records marked
-as current.
-
-This tests tries to provoke this bug by:
-
-- starting a server
-
-    >>> import ZEO.tests.testZEO, ZEO.tests.forker
-    >>> addr = 'localhost', ZEO.tests.testZEO.get_port()
-    >>> zconf = ZEO.tests.forker.ZEOConfig(addr)
-    >>> sconf = '<filestorage 1>\npath Data.fs\n</filestorage>\n'
-    >>> _, adminaddr, pid, conf_path = ZEO.tests.forker.start_zeo_server(
-    ...     sconf, zconf, addr[1])
-
-- opening a client to the server that writes some objects, filling
-  it's cache at the same time,
-
-    >>> import ZEO.ClientStorage, ZODB.tests.MinPO, transaction
-    >>> db = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr, client='x'))
-    >>> conn = db.open()
-    >>> nobs = 1000
-    >>> for i in range(nobs):
-    ...     conn.root()[i] = ZODB.tests.MinPO.MinPO(0)
-    >>> transaction.commit()
-
-- disconnecting the first client (closing it with a persistent cache),
-
-    >>> db.close()
-
-- starting a second client that writes objects more or less
-  constantly,
-
-    >>> import random, threading
-    >>> stop = False
-    >>> db2 = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr))
-    >>> tm = transaction.TransactionManager()
-    >>> conn2 = db2.open(transaction_manager=tm)
-    >>> random = random.Random(0)
-    >>> lock = threading.Lock()
-    >>> def run():
-    ...     while 1:
-    ...         i = random.randint(0, nobs-1)
-    ...         if stop:
-    ...             return
-    ...         lock.acquire()
-    ...         try:
-    ...             conn2.root()[i].value += 1
-    ...             tm.commit()
-    ...         finally:
-    ...             lock.release()
-    ...             time.sleep(0)
-    >>> thread = threading.Thread(target=run)
-    >>> thread.start()
-
-- restarting the first client, and 
-- testing for cache validity.
-
-    >>> import zope.testing.loggingsupport, logging
-    >>> handler = zope.testing.loggingsupport.InstalledHandler(
-    ...    'ZEO', level=logging.ERROR)
-
-    >>> import time
-    >>> for c in range(10):
-    ...    time.sleep(.1)
-    ...    db = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr, client='x'))
-    ...    _ = lock.acquire()
-    ...    try:
-    ...      time.sleep(.1)
-    ...      assert (db._storage.lastTransaction()
-    ...              == db._storage._server.lastTransaction()), (
-    ...                  db._storage.lastTransaction(),
-    ...                  db._storage._server.lastTransactiion())
-    ...      conn = db.open()
-    ...      for i in range(1000):
-    ...        if conn.root()[i].value != conn2.root()[i].value:
-    ...            print 'bad', c, i, conn.root()[i].value,
-    ...            print  conn2.root()[i].value
-    ...    finally:
-    ...      _ = lock.release()
-    ...    db.close()
-
-    >>> stop = True
-    >>> thread.join(10)
-    >>> thread.isAlive()
-    False
-
-    >>> for record in handler.records:
-    ...     print record.name, record.levelname
-    ...     print handler.format(record)
-
-    >>> handler.uninstall()
-
-    >>> db.close()
-    >>> db2.close()
-    >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr)
-

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -21,8 +21,8 @@
 import unittest
 # Import the actual test class
 from ZEO.tests import ConnectionTests, InvalidationTests
-from zope.testing import doctest, setupstack
 
+
 class FileStorageConfig:
     def getConfig(self, path, create, read_only):
         return """\
@@ -135,10 +135,6 @@
     for klass in test_classes:
         sub = unittest.makeSuite(klass, 'check')
         suite.addTest(sub)
-    suite.addTest(doctest.DocFileSuite(
-        'invalidations_while_connecting.test',
-        setUp=setupstack.setUpDirectory, tearDown=setupstack.tearDown,
-        ))
     return suite
 
 

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -18,7 +18,9 @@
 import doctest
 import logging
 import os
+import random
 import signal
+import socket
 import stat
 import tempfile
 import threading
@@ -48,7 +50,6 @@
 import ZEO.zrpc.connection
 
 from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
-from ZEO.tests.forker import get_port
 
 import ZEO.tests.ConnectionTests
 
@@ -127,6 +128,27 @@
         finally:
             storage2.close()
 
+def get_port():
+    """Return a port that is not in use.
+
+    Checks if a port is in use by trying to connect to it.  Assumes it
+    is not in use if connect raises an exception.
+
+    Raises RuntimeError after 10 tries.
+    """
+    for i in range(10):
+        port = random.randrange(20000, 30000)
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        try:
+            try:
+                s.connect(('localhost', port))
+            except socket.error:
+                # Perhaps we should check value of error too.
+                return port
+        finally:
+            s.close()
+    raise RuntimeError("Can't find port")
+
 class GenericTests(
     # Base class for all ZODB tests
     StorageTestBase.StorageTestBase,

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -35,6 +35,7 @@
         # testSerialization reads the entire file into a string, it's not
         # good to leave it that big.
         self.cache = ZEO.cache.ClientCache(size=1024**2)
+        self.cache.open()
 
     def tearDown(self):
         if self.cache.path:
@@ -44,6 +45,7 @@
         self.assertEqual(self.cache.getLastTid(), None)
         self.cache.setLastTid(n2)
         self.assertEqual(self.cache.getLastTid(), n2)
+        self.cache.invalidate(n1, "", n1)
         self.assertEqual(self.cache.getLastTid(), n2)
         self.cache.invalidate(n1, "", n3)
         self.assertEqual(self.cache.getLastTid(), n3)
@@ -63,8 +65,8 @@
     def testInvalidate(self):
         data1 = "data for n1"
         self.cache.store(n1, "", n3, None, data1)
-        self.cache.invalidate(n2, "", n2)
         self.cache.invalidate(n1, "", n4)
+        self.cache.invalidate(n2, "", n2)
         self.assertEqual(self.cache.load(n1, ""), None)
         self.assertEqual(self.cache.loadBefore(n1, n4),
                          (data1, n3, n4))
@@ -140,6 +142,7 @@
         dst.write(src.read(self.cache.maxsize))
         dst.close()
         copy = ZEO.cache.ClientCache(path)
+        copy.open()
 
         # Verify that internals of both objects are the same.
         # Could also test that external API produces the same results.
@@ -155,6 +158,7 @@
         if self.cache.path:
             os.remove(self.cache.path)
         cache = ZEO.cache.ClientCache(size=50)
+        cache.open()
 
         # We store an object that is a bit larger than the cache can handle.
         cache.store(n1, '', n2, None, "x"*64)
@@ -170,6 +174,7 @@
         if self.cache.path:
             os.remove(self.cache.path)
         cache = ZEO.cache.ClientCache(size=50)
+        cache.open()
 
         # We store an object that is a bit larger than the cache can handle.
         cache.store(n1, '', n2, n3, "x"*64)
@@ -213,6 +218,7 @@
     ...     _ = os.spawnl(os.P_WAIT, sys.executable, sys.executable, 't')
     ...     if os.path.exists('cache'):
     ...         cache = ZEO.cache.ClientCache('cache')
+    ...         cache.open()
     ...         cache.close()
     ...         os.remove('cache')
     ...         os.remove('cache.lock')
@@ -232,6 +238,7 @@
     >>> cache.store(ZODB.utils.p64(1), '', ZODB.utils.p64(1), None, data)
     >>> cache.close()
     >>> cache = ZEO.cache.ClientCache('cache', 1000)
+    >>> cache.open()
     >>> cache.store(ZODB.utils.p64(2), '', ZODB.utils.p64(2), None, 'XXX')
 
     >>> cache.close()
@@ -248,57 +255,6 @@
 
     >>> cache.close()
     """,
-
-    thread_safe =
-    r"""
-
-    >>> import ZEO.cache, ZODB.utils
-    >>> cache = ZEO.cache.ClientCache('cache', 1000000)
-
-    >>> for i in range(100):
-    ...     cache.store(ZODB.utils.p64(i), '', ZODB.utils.p64(1), None, '0')
-
-    >>> import random, sys, threading
-    >>> random = random.Random(0)
-    >>> stop = False
-    >>> read_failure = None
-
-    >>> def read_thread():
-    ...     def pick_oid():
-    ...         return ZODB.utils.p64(random.randint(0,99))
-    ...
-    ...     try:
-    ...         while not stop:
-    ...             cache.load(pick_oid())
-    ...             cache.loadBefore(pick_oid(), ZODB.utils.p64(2))
-    ...             cache.modifiedInVersion(pick_oid())
-    ...     except:
-    ...         global read_failure
-    ...         read_failure = sys.exc_info()
-
-    >>> thread = threading.Thread(target=read_thread)
-    >>> thread.start()
-
-    >>> for tid in range(2,10):
-    ...     for oid in range(100):
-    ...         oid = ZODB.utils.p64(oid)
-    ...         cache.invalidate(oid, '', ZODB.utils.p64(tid))
-    ...         cache.store(oid, '', ZODB.utils.p64(tid), None, str(tid))
-
-    >>> stop = True
-    >>> thread.join()
-    >>> if read_failure:
-    ...    print 'Read failure:'
-    ...    import traceback
-    ...    traceback.print_exception(*read_failure)
-
-    >>> expected = '9', ZODB.utils.p64(9), ''
-    >>> for oid in range(100):
-    ...     loaded = cache.load(ZODB.utils.p64(oid))
-    ...     if loaded != expected:
-    ...         print oid, loaded
-    
-    """,
     )
 
 

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -447,7 +447,8 @@
         Call the client's testConnection(), giving the client a chance
         to do app-level check of the connection.
         """
-        self.conn = ManagedClientConnection(self.sock, self.addr, self.mgr)
+        self.conn = ManagedClientConnection(self.sock, self.addr,
+                                            self.client, self.mgr)
         self.sock = None # The socket is now owned by the connection
         try:
             self.preferred = self.client.testConnection(self.conn)

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -555,23 +555,14 @@
             self.replies_cond.release()
 
     def handle_request(self, msgid, flags, name, args):
-        obj = self.obj
-        
-        if name.startswith('_') or not hasattr(obj, name):
-            if obj is None:
-                if __debug__:
-                    self.log("no object calling %s%s"
-                             % (name, short_repr(args)),
-                             level=logging.DEBUG)
-                return
-                
-            msg = "Invalid method name: %s on %s" % (name, repr(obj))
+        if not self.check_method(name):
+            msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
             raise ZRPCError(msg)
         if __debug__:
             self.log("calling %s%s" % (name, short_repr(args)),
                      level=logging.DEBUG)
 
-        meth = getattr(obj, name)
+        meth = getattr(self.obj, name)
         try:
             self.waiting_for_reply = True
             try:
@@ -610,6 +601,12 @@
                  level=logging.ERROR, exc_info=True)
         self.close()
 
+    def check_method(self, name):
+        # TODO:  This is hardly "secure".
+        if name.startswith('_'):
+            return None
+        return hasattr(self.obj, name)
+
     def send_reply(self, msgid, ret):
         # encode() can pass on a wide variety of exceptions from cPickle.
         # While a bare `except` is generally poor practice, in this case
@@ -900,7 +897,7 @@
     __super_close = Connection.close
     base_message_output = Connection.message_output
 
-    def __init__(self, sock, addr, mgr):
+    def __init__(self, sock, addr, obj, mgr):
         self.mgr = mgr
 
         # We can't use the base smac's message_output directly because the
@@ -917,7 +914,7 @@
         self.queue_output = True
         self.queued_messages = []
 
-        self.__super_init(sock, addr, None, tag='C', map=client_map)
+        self.__super_init(sock, addr, obj, tag='C', map=client_map)
         self.thr_async = True
         self.trigger = client_trigger
         client_trigger.pull_trigger()

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -594,14 +594,7 @@
             oid = obj._p_oid
             serial = getattr(obj, "_p_serial", z64)
 
-            if ((serial == z64)
-                and
-                ((self._savepoint_storage is None)
-                 or (oid not in self._savepoint_storage.creating)
-                 or self._savepoint_storage.creating[oid]
-                 )
-                ):
-                
+            if serial == z64:
                 # obj is a new object
 
                 # Because obj was added, it is now in _creating, so it

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -531,10 +531,6 @@
 LAYOUTS['lawn'] = LawnLayout()
 
 
-class BlobStorageError(Exception):
-    """The blob storage encountered an invalid state."""
-
-
 class BlobStorage(SpecificationDecoratorBase):
     """A storage to support blobs."""
 
@@ -542,8 +538,7 @@
 
     # Proxies can't have a __dict__ so specifying __slots__ here allows
     # us to have instance attributes explicitly on the proxy.
-    __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo',
-                 '_blobs_pack_is_in_progress', )
+    __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo')
 
     def __new__(self, base_directory, storage, layout='automatic'):
         return SpecificationDecoratorBase.__new__(self, storage)
@@ -562,7 +557,6 @@
         else:
             supportsUndo = supportsUndo()
         self.__supportsUndo = supportsUndo
-        self._blobs_pack_is_in_progress = False
 
     @non_overridable
     def temporaryDirectory(self):
@@ -668,29 +662,21 @@
 
     @non_overridable
     def pack(self, packtime, referencesf):
-        """Remove all unused OID/TID combinations."""
-        self._lock_acquire()
-        try:
-            if self._blobs_pack_is_in_progress:
-                raise BlobStorageError('Already packing')
-            self._blobs_pack_is_in_progress = True
-        finally:
-            self._lock_release()
+        """Remove all unused oid/tid combinations."""
+        unproxied = getProxiedObject(self)
 
-        try:
-            # Pack the underlying storage, which will allow us to determine
-            # which serials are current.
-            unproxied = getProxiedObject(self)
-            result = unproxied.pack(packtime, referencesf)
+        # pack the underlying storage, which will allow us to determine
+        # which serials are current.
+        result = unproxied.pack(packtime, referencesf)
 
-            # Perform a pack on the blob data.
+        # perform a pack on blob data
+        self._lock_acquire()
+        try:
             if self.__supportsUndo:
                 self._packUndoing(packtime, referencesf)
             else:
                 self._packNonUndoing(packtime, referencesf)
         finally:
-            self._lock_acquire()
-            self._blobs_pack_is_in_progress = False
             self._lock_release()
 
         return result

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt	2008-08-27 09:47:18 UTC (rev 90416)
@@ -240,37 +240,6 @@
     >>> os.path.exists(os.path.split(fns[0])[0])
     False
 
-Avoiding parallel packs
-=======================
-
-Blob packing (similar to FileStorage) can only be run once at a time. For
-this, a flag (_blobs_pack_is_in_progress) is set. If the pack method is called
-while this flag is set, it will refuse to perform another pack, until the flag
-is reset:
-
-    >>> blob_storage._blobs_pack_is_in_progress
-    False
-    >>> blob_storage._blobs_pack_is_in_progress = True
-    >>> blob_storage.pack(packtime, referencesf)
-    Traceback (most recent call last):
-    BlobStorageError: Already packing
-    >>> blob_storage._blobs_pack_is_in_progress = False
-    >>> blob_storage.pack(packtime, referencesf)
-
-We can also see, that the flag is set during the pack, by leveraging the
-knowledge that the underlying storage's pack method is also called:
-
-    >>> def dummy_pack(time, ref):
-    ...     print "_blobs_pack_is_in_progress =", blob_storage._blobs_pack_is_in_progress
-    ...     return base_pack(time, ref)
-    >>> base_pack = base_storage.pack
-    >>> base_storage.pack = dummy_pack
-    >>> blob_storage.pack(packtime, referencesf)
-    _blobs_pack_is_in_progress = True
-    >>> blob_storage._blobs_pack_is_in_progress
-    False
-    >>> base_storage.pack = base_pack
-
 Clean up our blob directory:
 
     >>> shutil.rmtree(blob_dir)

Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py	2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py	2008-08-27 09:47:18 UTC (rev 90416)
@@ -146,32 +146,6 @@
 
 """
 
-def test_explicit_adding_with_savepoint2():
-    """
-
-    >>> import ZODB.tests.util, transaction, persistent
-    >>> databases = {}
-    >>> db1 = ZODB.tests.util.DB(databases=databases, database_name='1')
-    >>> db2 = ZODB.tests.util.DB(databases=databases, database_name='2')
-    >>> tm = transaction.TransactionManager()
-    >>> conn1 = db1.open(transaction_manager=tm)
-    >>> conn2 = conn1.get_connection('2')
-    >>> z = MyClass()
-
-    >>> conn1.root()['z'] = z
-    >>> conn1.add(z)
-    >>> s = tm.savepoint()
-    >>> conn2.root()['z'] = z
-    >>> z.x = 1
-    >>> tm.commit()
-    >>> z._p_jar.db().database_name
-    '1'
-    
-    >>> db1.close()
-    >>> db2.close()
-
-"""
-
 def tearDownDbs(test):
     test.globs['db1'].close()
     test.globs['db2'].close()



More information about the Zodb-checkins mailing list