[Zodb-checkins] CVS: Packages/ZEO - speed.py:1.3

jeremy@digicool.com jeremy@digicool.com
Tue, 1 May 2001 18:41:27 -0400 (EDT)


Update of /cvs-repository/Packages/ZEO/tests
In directory korak:/tmp/cvs-serv20967/tests

Modified Files:
	speed.py 
Log Message:
Modify to user forker
Add -t n flag to specify number of concurrent threads
Add -U flag to specify a Unix domain socket






--- Updated File speed.py in package Packages/ZEO --
--- speed.py	2001/05/01 19:50:42	1.2
+++ speed.py	2001/05/01 22:41:26	1.3
@@ -106,6 +106,10 @@
     -M         Output means only
 
     -C         Run with a persistent client cache
+
+    -U         Run ZEO using a Unix domain socket
+
+    -t n       Number of concurrent threads to run.
 """
 
 import asyncore  
@@ -115,6 +119,8 @@
 import ZODB, ZODB.FileStorage
 import Persistence
 import ZEO.ClientStorage, ZEO.StorageServer
+from ZEO.tests import forker
+from ZODB.POSException import ConflictError
 
 class P(Persistence.Persistent):
     pass
@@ -141,8 +147,49 @@
         os.unlink(fs_name + ".lock")
         os.unlink(fs_name + ".tmp")
 
+def work(db, results, nrep, compress, data, detailed, minimize, threadno=None):
+    for j in range(nrep):
+        for r in 1, 10, 100, 1000:
+            t = time.time()
+            
+            jar = db.open()
+            while 1:
+                try:
+                    get_transaction().begin()
+                    rt = jar.root()
+                    key = 's%s' % r
+                    if rt.has_key(key):
+                        p = rt[key]
+                    else:
+                        rt[key] = p =P()
+                    for i in range(r):
+                        v = getattr(p, str(i), P())
+                        if compress is not None:
+                            v.d = compress(data)
+                        else:
+                            v.d = data
+                        setattr(p, str(i), v)
+                    get_transaction().commit()
+                except ConflictError:
+                    pass
+                else:
+                    break
+            jar.close()
+            
+            t = time.time() - t
+            if detailed:
+                if threadno is None:
+                    print "%s\t%s\t%.4f" % (j, r, t)
+                else:
+                    print "%s\t%s\t%.4f\t%d" % (j, r, t, threadno)
+            results[r] = results[r] + t
+            rt=d=p=v=None # release all references
+            if minimize:
+                time.sleep(3)
+                jar.cacheMinimize(3)
+
 def main(args):
-    opts, args = getopt.getopt(args, 'zd:n:Ds:LM')
+    opts, args = getopt.getopt(args, 'zd:n:Ds:LMt:U')
     s = None
     compress = None
     data=sys.argv[0]
@@ -150,6 +197,8 @@
     minimize=0
     detailed=1
     cache = None
+    domain = 'AF_INET'
+    threads = 1
     for o, v in opts:
         if o=='-n': nrep = int(v)
         elif o=='-d': data = v
@@ -168,79 +217,50 @@
             debug = 1
         elif o == '-C':
             cache = 'speed'
+        elif o == '-U':
+            domain = 'AF_UNIX'
+        elif o == '-t':
+            threads = int(v)
 
     zeo_pipe = None
     if s:
         s = __import__(s, globals(), globals(), ('__doc__',))
         s = s.Storage
+        server = None
     else:
-        rd, wr = os.pipe()
-        pid = os.fork()
-        if pid:
-            # in the child, run the storage server
-            os.close(wr)
-            import asyncore
-            ZEOExit(rd)
-            fs = ZODB.FileStorage.FileStorage(fs_name, create=1)
-            serv = ZEO.StorageServer.StorageServer(('', 1975), {'1':fs})
-            asyncore.loop()
-        else:
-            os.close(rd)
-            zeo_pipe = wr
-            s = ZEO.ClientStorage.ClientStorage(('', 1975), debug=0,
-                                                client=cache)
-            if hasattr(s, 'is_connected'):
-                while not s.is_connected():
-                    time.sleep(0.1)
-            else:
-                time.sleep(1.0)
+        fs = ZODB.FileStorage.FileStorage(fs_name, create=1)
+        s, server, pid = forker.start_zeo(fs, domain=domain)
 
     data=open(data).read()
     db=ZODB.DB(s,
                # disable cache deactivation
                cache_size=4000,
                cache_deactivate_after=6000,)
-    db.open().root()
 
+    print "Beginning work..."
     results={1:0, 10:0, 100:0, 1000:0}
-    for j in range(nrep):
-        for r in 1, 10, 100, 1000:
-            t = time.time()
-            
-            jar = db.open()
-            get_transaction().begin()
-            rt = jar.root()
-            key = 's%s' % r
-            if rt.has_key(key):
-                p = rt[key]
-            else:
-                rt[key] = p =P()
-            for i in range(r):
-                v = getattr(p, str(i), P())
-                if compress is not None:
-                    v.d = compress(data)
-                else:
-                    v.d = data
-                setattr(p, str(i), v)
-            get_transaction().commit()
-            jar.close()
-            
-            t = time.time() - t
-            if detailed:
-                print "%s\t%s\t%.4f" % (j, r, t)
-            results[r] = results[r] + t
-            rt=d=p=v=None # release all references
-            if minimize:
-                time.sleep(3)
-                jar.cacheMinimize(3)
+    if threads > 1:
+        import threading
+        l = [threading.Thread(target=work,
+                              args=(db, results, nrep, compress, data,
+                                    detailed, minimize, i))
+             for i in range(threads)]
+        for t in l:
+            t.start()
+        for t in l:
+            t.join()
+
+    else:
+        work(db, results, nrep, compress, data, detailed, minimize)
 
-    if zeo_pipe:
-        os.write(zeo_pipe, "done")
+    if server is not None:
+        server.close()
+        os.waitpid(pid, 0)
 
     if detailed:
         print '-'*24
     for r in 1, 10, 100, 1000:
-        t=results[r]/nrep
+        t=results[r]/(nrep * threads)
         print "mean:\t%s\t%.4f\t%.4f (s/o)" % (r, t, t/r)
     
 ##def compress(s):