[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py Added support for coordinating workers accross multiple machines using

Jim Fulton jim at zope.com
Thu Aug 11 16:55:14 EDT 2011


Log message for revision 122557:
  Added support for coordinating workers accross multiple machines using
  0mq.
  

Changed:
  U   zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py

-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py	2011-08-11 20:14:37 UTC (rev 122556)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py	2011-08-11 20:55:14 UTC (rev 122557)
@@ -11,18 +11,15 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
-"""Replay ZEO input logs in read-only mode
-"""
+"Replay ZEO input logs in read-only mode"
 
-from multiprocessing import Process, Queue
-# from threading import Thread as Process
-# from Queue import Queue
-
 import cPickle
 import logging
 import marshal
+import multiprocessing
 import optparse
 import os
+import Queue
 import sys
 import tempfile
 import threading
@@ -141,6 +138,98 @@
                  ),
                 out)
 
+def zmqProcessFactory(zmqaddr):
+    return (lambda target, args:
+            multiprocessing.Process(
+                target=zmqWorker, args = (zmqaddr, target, args))
+            )
+
+def zmqWorker(zmqaddr, target, args):
+    import zmq
+    context = zmq.Context()
+    socket = context.socket(zmq.XREQ)
+    socket.connect(zmqaddr)
+
+    session = args[-3]
+    socket.send(cPickle.dumps(session, 1))
+
+    queue = zmqQueue(socket)
+
+    target(*(args[:-2]+(queue, queue)))
+
+class zmqQueue:
+
+    def __init__(self, socket):
+        self.socket = socket
+        self._send = socket.send
+
+    def get(self, block=True, timeout=None):
+        return cPickle.loads(self.socket.recv(timeout*1000))
+
+    def put(self, data):
+        self._send(cPickle.dumps(data, 1))
+
+class zmqBoss:
+
+    def __init__(self, zmqaddr, sessions, handlers_queue):
+        import zmq
+        context = zmq.Context()
+        socket = context.socket(zmq.XREP)
+        socket.bind(zmqaddr)
+        time.sleep(10) # Give lots of time to hook up w workers
+        poller = self.poller = zmq.Poller()
+        poller.register(socket, zmq.POLLIN)
+        session_addrs = {}
+        handler_queue = Queue.Queue()
+        while 1:
+            ready = dict(poller.poll(1000))
+            if ready.get(socket) == zmq.POLLIN:
+                addr, session = socket.recv_multipart()
+                session = cPickle.loads(session)
+                if isinstance(session, int):
+                    session_addrs[session] = addr
+                    sessions[session] = self.session_put(handler_queue, addr)
+                else:
+                    handlers_queue.put(session)
+            else:
+                break
+
+        print 'Got', len(sessions), 'zmq workers'
+        thread = threading.Thread(
+            target = self.run,
+            args = (poller, socket, handler_queue, handlers_queue),
+            )
+        thread.daemon = True
+        thread.start()
+
+    def session_put(self, queue, addr):
+
+        def put(ob):
+            queue.put((addr, ob))
+
+        return put
+
+    def run(self, poller, socket, handler_queue, handlers_queue):
+        import zmq
+        while 1:
+            try:
+                addr, ob = handler_queue.get(False)
+            except Queue.Empty:
+                pass
+            else:
+                socket.send_multipart([addr, cPickle.dumps(ob, 1)])
+                continue
+
+            ready = dict(poller.poll(10))
+            if ready.get(socket) == zmq.POLLIN:
+                _, data = socket.recv_multipart()
+                data = cPickle.loads(data)
+                if isinstance(data, int):
+                    print 'Late session', data
+                else:
+                    handlers_queue.put(data)
+
+
 class Handler:
 
     msgid = 0
@@ -401,8 +490,14 @@
     return times
 
 parser = optparse.OptionParser("""
-Usage: %prog [options] address log
+Usage:
 
+    %prog [options] address log sessions
+
+    %prog --zmq-boss tcp://IP:PORT [options] log
+
+    %prog --zmq-workee tcp://IP:PORT [options] address sessions
+
     Where:
 
        address
@@ -410,6 +505,15 @@
 
        log
           zeo input log
+
+       sessions
+          A file containing a list of session identifiers.
+
+
+    To use 0mq, start one or more worker processes and then start a
+    boss porocess.  When specifying an IP, an IP #, rather than a host
+    name must be used.
+
 """)
 parser.add_option("--max_records", "-m", dest='max_records',
                   type="int", default=999999999,
@@ -429,64 +533,90 @@
                   help="Port to get status data from.")
 
 
+parser.add_option("--zmq-boss");
+parser.add_option("--zmq-worker");
+
+
 def main(args=None):
     if args is None:
         args = sys.argv[1:]
 
 
     # Maybe add options for following:
-    singe_threaded = False
     simulate_ssd = False
 
     print "$Id$"
     print args
 
     options, args = parser.parse_args(args)
-    addr, log, sessionids = args
+
+    log = addr = sessionids = ()
+    if options.zmq_boss:
+        [log] = args
+    else:
+        if options.zmq_worker:
+            addr, sessionids = args
+        else:
+            addr, log, sessionids = args
+
+    if log:
+        log = Log(log, set(('loadEx', 'sendBlob')))
+    if addr:
+        addr = parse_addr(addr)
+    if sessionids:
+        sessionids = open(sessionids).read().strip().split()
+
     max_records = options.max_records
 
-    addr = parse_addr(addr)
 
-    log = Log(log, set(('loadEx', 'sendBlob')))
-    sessionids = open(sessionids).readlines()
-
     # Set up the client connections
     sessions = {}
-    nhandlers = 0
-    handlers_queue = Queue()
+    handlers_queue = multiprocessing.Queue()
     processes = []
+
+    if options.zmq_worker:
+        process_factory = zmqProcessFactory(options.zmq_worker)
+    else:
+        process_factory = multiprocessing.Process
+
     for session in sessionids:
         session = int(session.strip())
-        if singe_threaded:
-            session = '1'
+
         if session not in sessions:
-            handler_queue = Queue()
+            handler_queue = multiprocessing.Queue()
             if options.s3:
-                process = Process(
+                process = process_factory(
                     target = S3Handler,
                     args = (options.s3,
-                            addr, nhandlers, handler_queue, handlers_queue),
+                            addr, session, handler_queue, handlers_queue),
                     )
             elif options.blob_url:
-                process = Process(
+                process = process_factory(
                     target = HTTPHandler,
                     args = (options.blob_url,
-                            addr, nhandlers, handler_queue, handlers_queue),
+                            addr, session, handler_queue, handlers_queue),
                     )
             else:
-                process = Process(
+                process = process_factory(
                     target = Handler,
-                    args = (addr, nhandlers, handler_queue, handlers_queue),
+                    args = (addr, session, handler_queue, handlers_queue),
                     )
-            process.daemon = True
+            process.daemon = not options.zmq_worker
             process.start()
             processes.append(process)
-            sessions[session] = handler_queue
-            nhandlers += 1
+            sessions[session] = handler_queue.put
 
+    if options.zmq_worker:
+        return
+
+    if options.zmq_boss:
+        handlers_queue = Queue.Queue()
+        zmqBoss(options.zmq_boss, sessions, handlers_queue)
+
     nsessions = len(sessions)
     handlers = Handlers(nsessions)
     thread = threading.Thread(target=handlers.run, args=(handlers_queue, ))
+    thread.daemon = True
     thread.start()
 
     handlers.event.wait(10)
@@ -504,9 +634,6 @@
         if session not in sessions:     # Skip unknown sessions
             continue
 
-        if singe_threaded:
-            session = '1'
-
         if simulate_ssd:
             if op == 'loadEx':
                 args = (ZODB.utils.z64,)
@@ -552,12 +679,12 @@
 
         handlers.maxactive = 0
 
-        sessions[session].put((async, op, args))
+        sessions[session]((async, op, args))
         if nrecords >= max_records:
             break
 
-    for q in sessions.values():
-        q.put('stop')
+    for put in sessions.values():
+        put('stop')
 
     print '='*70
 



More information about the checkins mailing list