[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py Added support for coordinating workers accross multiple machines using
Jim Fulton
jim at zope.com
Thu Aug 11 16:55:14 EDT 2011
Log message for revision 122557:
Added support for coordinating workers accross multiple machines using
0mq.
Changed:
U zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py 2011-08-11 20:14:37 UTC (rev 122556)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py 2011-08-11 20:55:14 UTC (rev 122557)
@@ -11,18 +11,15 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""Replay ZEO input logs in read-only mode
-"""
+"Replay ZEO input logs in read-only mode"
-from multiprocessing import Process, Queue
-# from threading import Thread as Process
-# from Queue import Queue
-
import cPickle
import logging
import marshal
+import multiprocessing
import optparse
import os
+import Queue
import sys
import tempfile
import threading
@@ -141,6 +138,98 @@
),
out)
+def zmqProcessFactory(zmqaddr):
+ return (lambda target, args:
+ multiprocessing.Process(
+ target=zmqWorker, args = (zmqaddr, target, args))
+ )
+
+def zmqWorker(zmqaddr, target, args):
+ import zmq
+ context = zmq.Context()
+ socket = context.socket(zmq.XREQ)
+ socket.connect(zmqaddr)
+
+ session = args[-3]
+ socket.send(cPickle.dumps(session, 1))
+
+ queue = zmqQueue(socket)
+
+ target(*(args[:-2]+(queue, queue)))
+
+class zmqQueue:
+
+ def __init__(self, socket):
+ self.socket = socket
+ self._send = socket.send
+
+ def get(self, block=True, timeout=None):
+ return cPickle.loads(self.socket.recv(timeout*1000))
+
+ def put(self, data):
+ self._send(cPickle.dumps(data, 1))
+
+class zmqBoss:
+
+ def __init__(self, zmqaddr, sessions, handlers_queue):
+ import zmq
+ context = zmq.Context()
+ socket = context.socket(zmq.XREP)
+ socket.bind(zmqaddr)
+ time.sleep(10) # Give lots of time to hook up w workers
+ poller = self.poller = zmq.Poller()
+ poller.register(socket, zmq.POLLIN)
+ session_addrs = {}
+ handler_queue = Queue.Queue()
+ while 1:
+ ready = dict(poller.poll(1000))
+ if ready.get(socket) == zmq.POLLIN:
+ addr, session = socket.recv_multipart()
+ session = cPickle.loads(session)
+ if isinstance(session, int):
+ session_addrs[session] = addr
+ sessions[session] = self.session_put(handler_queue, addr)
+ else:
+ handlers_queue.put(session)
+ else:
+ break
+
+ print 'Got', len(sessions), 'zmq workers'
+ thread = threading.Thread(
+ target = self.run,
+ args = (poller, socket, handler_queue, handlers_queue),
+ )
+ thread.daemon = True
+ thread.start()
+
+ def session_put(self, queue, addr):
+
+ def put(ob):
+ queue.put((addr, ob))
+
+ return put
+
+ def run(self, poller, socket, handler_queue, handlers_queue):
+ import zmq
+ while 1:
+ try:
+ addr, ob = handler_queue.get(False)
+ except Queue.Empty:
+ pass
+ else:
+ socket.send_multipart([addr, cPickle.dumps(ob, 1)])
+ continue
+
+ ready = dict(poller.poll(10))
+ if ready.get(socket) == zmq.POLLIN:
+ _, data = socket.recv_multipart()
+ data = cPickle.loads(data)
+ if isinstance(data, int):
+ print 'Late session', data
+ else:
+ handlers_queue.put(data)
+
+
class Handler:
msgid = 0
@@ -401,8 +490,14 @@
return times
parser = optparse.OptionParser("""
-Usage: %prog [options] address log
+Usage:
+ %prog [options] address log sessions
+
+ %prog --zmq-boss tcp://IP:PORT [options] log
+
+ %prog --zmq-workee tcp://IP:PORT [options] address sessions
+
Where:
address
@@ -410,6 +505,15 @@
log
zeo input log
+
+ sessions
+ A file containing a list of session identifiers.
+
+
+ To use 0mq, start one or more worker processes and then start a
+ boss porocess. When specifying an IP, an IP #, rather than a host
+ name must be used.
+
""")
parser.add_option("--max_records", "-m", dest='max_records',
type="int", default=999999999,
@@ -429,64 +533,90 @@
help="Port to get status data from.")
+parser.add_option("--zmq-boss");
+parser.add_option("--zmq-worker");
+
+
def main(args=None):
if args is None:
args = sys.argv[1:]
# Maybe add options for following:
- singe_threaded = False
simulate_ssd = False
print "$Id$"
print args
options, args = parser.parse_args(args)
- addr, log, sessionids = args
+
+ log = addr = sessionids = ()
+ if options.zmq_boss:
+ [log] = args
+ else:
+ if options.zmq_worker:
+ addr, sessionids = args
+ else:
+ addr, log, sessionids = args
+
+ if log:
+ log = Log(log, set(('loadEx', 'sendBlob')))
+ if addr:
+ addr = parse_addr(addr)
+ if sessionids:
+ sessionids = open(sessionids).read().strip().split()
+
max_records = options.max_records
- addr = parse_addr(addr)
- log = Log(log, set(('loadEx', 'sendBlob')))
- sessionids = open(sessionids).readlines()
-
# Set up the client connections
sessions = {}
- nhandlers = 0
- handlers_queue = Queue()
+ handlers_queue = multiprocessing.Queue()
processes = []
+
+ if options.zmq_worker:
+ process_factory = zmqProcessFactory(options.zmq_worker)
+ else:
+ process_factory = multiprocessing.Process
+
for session in sessionids:
session = int(session.strip())
- if singe_threaded:
- session = '1'
+
if session not in sessions:
- handler_queue = Queue()
+ handler_queue = multiprocessing.Queue()
if options.s3:
- process = Process(
+ process = process_factory(
target = S3Handler,
args = (options.s3,
- addr, nhandlers, handler_queue, handlers_queue),
+ addr, session, handler_queue, handlers_queue),
)
elif options.blob_url:
- process = Process(
+ process = process_factory(
target = HTTPHandler,
args = (options.blob_url,
- addr, nhandlers, handler_queue, handlers_queue),
+ addr, session, handler_queue, handlers_queue),
)
else:
- process = Process(
+ process = process_factory(
target = Handler,
- args = (addr, nhandlers, handler_queue, handlers_queue),
+ args = (addr, session, handler_queue, handlers_queue),
)
- process.daemon = True
+ process.daemon = not options.zmq_worker
process.start()
processes.append(process)
- sessions[session] = handler_queue
- nhandlers += 1
+ sessions[session] = handler_queue.put
+ if options.zmq_worker:
+ return
+
+ if options.zmq_boss:
+ handlers_queue = Queue.Queue()
+ zmqBoss(options.zmq_boss, sessions, handlers_queue)
+
nsessions = len(sessions)
handlers = Handlers(nsessions)
thread = threading.Thread(target=handlers.run, args=(handlers_queue, ))
+ thread.daemon = True
thread.start()
handlers.event.wait(10)
@@ -504,9 +634,6 @@
if session not in sessions: # Skip unknown sessions
continue
- if singe_threaded:
- session = '1'
-
if simulate_ssd:
if op == 'loadEx':
args = (ZODB.utils.z64,)
@@ -552,12 +679,12 @@
handlers.maxactive = 0
- sessions[session].put((async, op, args))
+ sessions[session]((async, op, args))
if nrecords >= max_records:
break
- for q in sessions.values():
- q.put('stop')
+ for put in sessions.values():
+ put('stop')
print '='*70
More information about the checkins
mailing list