[Zodb-checkins] CVS: ZEO/ZEO/tests - stress.py:1.2 Cache.py:1.4 forker.py:1.10 testZEO.py:1.16

Jeremy Hylton jeremy@zope.com
Fri, 2 Nov 2001 15:54:21 -0500


Update of /cvs-repository/ZEO/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv14473

Modified Files:
	Cache.py forker.py testZEO.py 
Added Files:
	stress.py 
Log Message:
Merge changes from zeo-1_0-branch to the trunk.

The trunk now has the same code ZEO 1.0b5 plus a few minor changes.


=== ZEO/ZEO/tests/stress.py 1.1 => 1.2 ===
+
+The stress test should run in an infinite loop and should involve
+multiple connections.
+"""
+from __future__ import nested_scopes
+
+import ZODB
+from ZEO.ClientStorage import ClientStorage
+from ZODB.MappingStorage import MappingStorage
+from ZEO.tests import forker
+from ZODB.tests import MinPO
+import zLOG
+
+import os
+import random
+import sys
+import types
+
+NUM_TRANSACTIONS_PER_CONN = 10
+NUM_CONNECTIONS = 10
+NUM_ROOTS = 20
+MAX_DEPTH = 20
+MIN_OBJSIZE = 128
+MAX_OBJSIZE = 2048
+
+def an_object():
+    """Return an object suitable for a PersistentMapping key"""
+    size = random.randrange(MIN_OBJSIZE, MAX_OBJSIZE)
+    if os.path.exists("/dev/urandom"):
+        f = open("/dev/urandom")
+        buf = f.read(size)
+        f.close()
+        return buf
+    else:
+        f = open(MinPO.__file__)
+        l = list(f.read(size))
+        f.close()
+        random.shuffle(l)
+        return "".join(l)
+
+def setup(cn):
+    """Initialize the database with some objects"""
+    root = cn.root()
+    for i in range(NUM_ROOTS):
+        prev = an_object()
+        for j in range(random.randrange(1, MAX_DEPTH)):
+            o = MinPO.MinPO(prev)
+            prev = o
+        root[an_object()] = o
+        get_transaction().commit()
+    cn.close()
+
+def work(cn):
+    """Do some work with a transaction"""
+    cn.sync()
+    root = cn.root()
+    obj = random.choice(root.values())
+    # walk down to the bottom
+    while not isinstance(obj.value, types.StringType):
+        obj = obj.value
+    obj.value = an_object()
+    get_transaction().commit()
+
+def main():
+    # Yuck!  Need to cleanup forker so that the API is consistent
+    # across Unix and Windows, at least if that's possible.
+    if os.name == "nt":
+        zaddr, tport, pid = forker.start_zeo_server('MappingStorage', ())
+        def exitserver():
+            import socket
+            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            s.connect(tport)
+            s.close()
+    else:
+        zaddr = '', random.randrange(20000, 30000)
+        pid, exitobj = forker.start_zeo_server(MappingStorage(), zaddr)
+        def exitserver():
+            exitobj.close()
+
+    while 1:
+        pid = start_child(zaddr)
+        print "started", pid
+        os.waitpid(pid, 0)
+
+    exitserver()
+
+def start_child(zaddr):
+
+    pid = os.fork()
+    if pid != 0:
+        return pid
+    
+    storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5)
+    db = ZODB.DB(storage, pool_size=NUM_CONNECTIONS)
+    setup(db.open())
+    conns = []
+    conn_count = 0
+
+    for i in range(NUM_CONNECTIONS):
+        c = db.open()
+        c.__count = 0
+        conns.append(c)
+        conn_count += 1
+
+    while conn_count < 25:
+        c = random.choice(conns)
+        if c.__count > NUM_TRANSACTIONS_PER_CONN:
+            conns.remove(c)
+            c.close()
+            conn_count += 1
+            c = db.open()
+            c.__count = 0
+            conns.append(c)
+        else:
+            c.__count += 1
+        work(c)
+
+    os._exit(0)
+
+if __name__ == "__main__":
+    main()


=== ZEO/ZEO/tests/Cache.py 1.3 => 1.4 ===
 
         info = self._storage.undoInfo()
+        if not info:
+            # XXX perhaps we have an old storage implementation that
+            # does do the negative nonsense
+            info = self._storage.undoInfo(0, 20)
         tid = info[0]['id']
+
+        # We may need to bail at this point if the storage doesn't
+        # support transactional undo
+        if not self._storage.supportsTransactionalUndo():
+            return
+
         # Now start an undo transaction
         self._transaction.note('undo1')
         self._storage.tpc_begin(self._transaction)


=== ZEO/ZEO/tests/forker.py 1.9 => 1.10 ===
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         try:
-            s.connect(('localhost', port))
-        except socket.error:
-            # XXX check value of error?
-            return port
+            try:
+                s.connect(('localhost', port))
+            except socket.error:
+                # XXX check value of error?
+                return port
+        finally:
+            s.close()
     raise RuntimeError, "Can't find port"
 
 if os.name == "nt":
@@ -37,14 +40,15 @@
         Returns the ZEO port, the test server port, and the pid.
         """
         import ZEO.tests.winserver
-        port = get_port()
+        if port is None:
+            port = get_port()
         script = ZEO.tests.winserver.__file__
         if script.endswith('.pyc'):
             script = script[:-1]
         args = (sys.executable, script, str(port), storage_name) + args
         d = os.environ.copy()
         d['PYTHONPATH'] = os.pathsep.join(sys.path)
-        pid = os.spawnve(os.P_NOWAIT, sys.executable, args, d)
+        pid = os.spawnve(os.P_NOWAIT, sys.executable, args, os.environ)
         return ('localhost', port), ('localhost', port + 1), pid
 
 else:
@@ -74,6 +78,7 @@
 
         def close(self):
             os.write(self.pipe, "done")
+            os.close(self.pipe)
 
     def start_zeo_server(storage, addr):
         rd, wr = os.pipe()
@@ -97,6 +102,7 @@
         ZEOServerExit(rd)
         serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
         asyncore.loop()
+        os.close(rd)
         storage.close()
         if isinstance(addr, types.StringType):
             os.unlink(addr)


=== ZEO/ZEO/tests/testZEO.py 1.15 => 1.16 ===
                 d[oid] = serial
         return d
+
+# Some of the ZEO tests depend on the version of FileStorage available
+# for the tests.  If we run these tests using Zope 2.3, FileStorage
+# doesn't support TransactionalUndo.
+
+if hasattr(FileStorage, 'supportsTransactionalUndo'):
+    # XXX Assume that a FileStorage that supports transactional undo
+    # also supports conflict resolution.
+    class VersionDependentTests(
+        TransactionalUndoStorage.TransactionalUndoStorage,
+        TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
+        ConflictResolution.ConflictResolvingStorage,
+        ConflictResolution.ConflictResolvingTransUndoStorage):
+        pass
+else:
+    class VersionDependentTests:
+        pass
         
 class GenericTests(ZEOTestBase,
+                   VersionDependentTests,
                    Cache.StorageWithCache,
                    Cache.TransUndoStorageWithCache,
                    BasicStorage.BasicStorage,
@@ -107,10 +125,6 @@
                    RevisionStorage.RevisionStorage,
                    PackableStorage.PackableStorage,
                    Synchronization.SynchronizedStorage,
-                   ConflictResolution.ConflictResolvingStorage,
-                   ConflictResolution.ConflictResolvingTransUndoStorage,
-                   TransactionalUndoStorage.TransactionalUndoStorage,
-      TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
                    ):
     """An abstract base class for ZEO tests
 
@@ -187,7 +201,7 @@
         zeo_addr, self.test_addr, self.test_pid = \
                   forker.start_zeo_server(name, args)
         storage = ZEO.ClientStorage.ClientStorage(zeo_addr, debug=1,
-                                                  min_disconnect_poll=0.5)
+                                                  min_disconnect_poll=0.1)
         self._storage = PackWaitWrapper(storage)
         storage.registerDB(DummyDB(), None)
 
@@ -195,6 +209,7 @@
         self._storage.close()
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         s.connect(self.test_addr)
+        s.close()
         # the connection should cause the storage server to die
 ##        os.waitpid(self.test_pid, 0)
         time.sleep(0.5)