[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py Use multiprocessing to run replay clients in separate processes to

Jim Fulton jim at zope.com
Tue Jan 12 10:03:19 EST 2010


Log message for revision 108062:
  Use multiprocessing to run replay clients in separate processes to
  overcome GIL.
  

Changed:
  U   zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py

-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py	2010-01-12 14:15:00 UTC (rev 108061)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py	2010-01-12 15:03:19 UTC (rev 108062)
@@ -50,6 +50,7 @@
 import cPickle
 import logging
 import marshal
+import multiprocessing
 import os
 import sys
 import threading
@@ -65,6 +66,8 @@
 import ZODB.TimeStamp
 import ZODB.utils
 
+sys.setcheckinterval(999)
+
 logging.basicConfig()
 
 def time_stamp(timetime):
@@ -270,23 +273,28 @@
     closed = 0
     queueing = True
 
-    def __init__(self, addr, session=None, handlers=None):
-        if handlers is None:
-            handlers = Handlers()
-        self.handlers = handlers
+    def __init__(self, addr, session, inq, outq):
         self.session = session
         self.addr = addr
         self.event = threading.Event()
         self.queue = []
         self.messages = {}
-        self.times = handlers.times
-        self.active = handlers.active
-        self.errtimes = handlers.errtimes
+        def output(op, *args):
+            outq.put((op, args))
+        self.output = output
+        self.outq = outq
         self.lock = threading.Lock()
-        zc.ngi.async.connect(addr, self)
+        self.ngi = zc.ngi.async.SelectImplementation()
+        self.ngi.connect(addr, self)
+        self.event.wait()
+        while 1:
+            callargs = inq.get()
+            if callargs == 'stop':
+                break
+            self.call(*callargs)
 
     def connected(self, connection):
-        self.handlers.connected += 1
+        self.output('connect')
         self.protocol = None
         self.connection = zc.ngi.adapters.Sized(connection)
         self.connection.setHandler(self)
@@ -303,12 +311,11 @@
         messages = sorted((v[2], v[0], v[1]) for v in self.messages.values())
         print time.ctime(), self.session, [v[:2] for v in messages]
         redo = [(0, v[1], v[2]) for v in messages
-                if v[1] in ('sendBlob', 'loadEx')]
+                if v[1] in ('sendBlob', 'loadEx', 'loadBefore')]
         self.queue.extend(redo)
-        self.handlers.abandoned += len(messages) - len(redo)
+        self.output('disconnect', len(messages) - len(redo))
         self.messages.clear()
-        zc.ngi.async.connect(self.addr, self)
-        self.handlers.connected -= 1
+        self.ngi.connect(self.addr, self)
 
     def stop_queueing(self):
         assert self.queueing
@@ -325,8 +332,6 @@
             self.event.set()
             return
 
-        self.handlers.inputs += 1
-
         try:
             msgid, flags, op, args = cPickle.loads(message)
         except:
@@ -334,27 +339,11 @@
             traceback.print_exception(*sys.exc_info())
             return
 
-        #print '  got', self.session, msgid, flags, op
         if (op == '.reply'):
             ret = args
             op, args, start = self.messages.pop(msgid)
             elapsed = time.time()-start
-            #print '    reply', op, [
-            #    (v[0], v[2]) for v in self.messages.values()
-            #    ], elapsed
-            self.handlers.replies += 1
-            self.active.remove(self.session)
-            if (isinstance(ret, tuple)
-                and len(ret) == 2
-                and isinstance(ret[1], Exception)
-                ):
-                n, t = self.errtimes.get(op, zz)
-                self.errtimes[op] = n+1, t+elapsed
-                print '  OOPS', op, args, elapsed, ret[0].__name__, ret[1]
-                self.handlers.errors += 1
-            else:
-                n, t = self.times.get(op, zz)
-                self.times[op] = n+1, t+elapsed
+            self.output('reply', op, args, ret, elapsed)
 
             self.stop_queueing()
 
@@ -376,24 +365,72 @@
             if not async:
                 #print '    prev out', self.session, [
                 #    (v[0], v[2]) for v in self.messages.values()]
+                self.queueing = True
                 self.messages[self.msgid] = op, args, time.time()
-                self.queueing = True
-                self.active.add(self.session)
 
             self.connection.write(cPickle.dumps((self.msgid, async, op, args)))
 
+            if not async:
+                self.output('request', op, args)
+
 zz = 0, 0
 
 class Handlers:
 
-    connected = async = calls = inputs = replies = errors = pending = 0
-    abandoned = 0
+    async = abandoned = 0
 
-    def __init__(self):
+    def __init__(self, disconnected):
         self.errtimes = {}
         self.times = {}
-        self.active = set()
+        self.disconnected = disconnected
+        self.connected = self.active = self.calls = self.replies = 0
+        self.errors = 0
+        self.event = threading.Event()
 
+    def __repr__(self):
+        return ("%(connected)s %(disconnected)s %(active)s"
+                " %(calls)s %(replies)s %(errors)s"
+                % self.__dict__)
+
+    def run(self, queue):
+        while 1:
+            got = queue.get()
+            op, args = got
+            getattr(self, op)(*args)
+
+    def connect(self):
+        self.disconnected -= 1
+        if not self.disconnected:
+            self.event.set()
+        self.connected += 1
+
+    def disconnect(self, abandoned):
+        self.disconnected += 1
+        self.connected -= 1
+        self.abandoned += abandoned
+        self.active -= abandoned
+
+    def request(self, op, args):
+        self.active += 1
+        self.calls += 1
+
+    def reply(self, op, args, ret, elapsed):
+        self.active -= 1
+        self.replies += 1
+        if (isinstance(ret, tuple)
+            and len(ret) == 2
+            and isinstance(ret[1], Exception)
+            ):
+            n, t = self.errtimes.get(op, zz)
+            self.errtimes[op] = n+1, t+elapsed
+            print '  OOPS', op, args, elapsed, ret[0].__name__, ret[1]
+            self.errors += 1
+        else:
+            n, t = self.times.get(op, zz)
+            self.times[op] = n+1, t+elapsed
+
+
+
 def parse_addr(addr):
     addr = addr.split(':')
     return addr[0], int(addr[1])
@@ -439,22 +476,29 @@
 
     log = Log(log)
 
-    handlers = Handlers()
-
     # Set up the client connections
     sessions = {}
     nhandlers = 0
+    handlers_queue = multiprocessing.Queue()
     for session, timetime, msgid, async, op, args in log:
         if session not in sessions:
-            handler = Handler(addr, nhandlers, handlers)
-            sessions[session] = handler
+            handler_queue = multiprocessing.Queue()
+            process = multiprocessing.Process(
+                target = Handler,
+                args = (addr, nhandlers, handler_queue, handlers_queue),
+                ).start()
+            sessions[session] = handler_queue
             nhandlers += 1
 
-    for handler in sessions.values():
-        handler.event.wait(10)
-        if not handler.event.is_set():
-            raise ValueError("Couldn't connect.")
+    handlers = Handlers(len(sessions))
+    thread = threading.Thread(target=handlers.run, args=(handlers_queue, ))
+    thread.setDaemon(True)
+    thread.start()
 
+    handlers.event.wait(10)
+    if not handlers.event.is_set():
+        raise ValueError("Couldn't connect.", handlers)
+
     # Now, we're ready to replay.
 
     cs = ZEO.ClientStorage.ClientStorage(addr)
@@ -469,10 +513,9 @@
     for t in Transactions(source):
 
         sys.stdout.flush()
-        pending = handlers.calls - handlers.replies - handlers.abandoned
-        while pending > 10000:
+        pending = handlers.active - handlers.abandoned
+        while handlers.active > 10000:
             time.sleep(.01)
-            pending = handlers.calls - handlers.replies - handlers.abandoned
 
         if nt and (nt%1000 == 0):
             last_times = print_times(last_times, handlers.times,
@@ -490,21 +533,17 @@
         lasttt = tt
         work = nt + nr + handlers.calls + handlers.async
         print nt, time.strftime('%H:%M:%S', time.localtime(time.time())),
-        print ZODB.TimeStamp.TimeStamp(time_stamp(tt)),
-        print handlers.connected, len(handlers.active), handlers.calls,
-        print handlers.replies,
-        print handlers.errors, handlers.async, pending, speed, speed1
+        print ZODB.TimeStamp.TimeStamp(time_stamp(tt)), handlers,
+        print speed, speed1
         while logrecord[1] < tt:
             ni += 1
             session, _, _, async, op, args = logrecord
             logrecord = logiter.next()
             if op in ('getAuthProtocol', 'register', 'tpc_finish'):
                 continue
-            sessions[session].call(async, op, args)
+            sessions[session].put((async, op, args))
             if async:
                 handlers.async += 1
-            else:
-                handlers.calls += 1
             #print op, args
             if op == 'vote':
                 handlers.async += 1



More information about the checkins mailing list