[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py Tweaked to only use a zmq socket in one thread.
Jim Fulton
jim at zope.com
Fri Aug 12 13:10:53 EDT 2011
Log message for revision 122566:
Tweaked to only use a zmq socket in one thread.
Also output a little more debug info.
Changed:
U zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py 2011-08-12 15:55:08 UTC (rev 122565)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py 2011-08-12 17:10:53 UTC (rev 122566)
@@ -171,7 +171,22 @@
class zmqBoss:
- def __init__(self, zmqaddr, sessions, handlers_queue):
+ def __init__(self, zmqaddr, sessions, handlers_queue, event):
+ thread = threading.Thread(
+ target = self.run,
+ args = (zmqaddr, sessions, handlers_queue, event),
+ )
+ thread.daemon = True
+ thread.start()
+
+ def session_put(self, queue, addr):
+
+ def put(ob):
+ queue.put((addr, ob))
+
+ return put
+
+ def run(self, zmqaddr, sessions, handlers_queue, event):
import zmq
context = zmq.Context()
socket = context.socket(zmq.XREP)
@@ -182,7 +197,7 @@
session_addrs = {}
handler_queue = Queue.Queue()
while 1:
- ready = dict(poller.poll(1000))
+ ready = dict(poller.poll(2000))
if ready.get(socket) == zmq.POLLIN:
addr, session = socket.recv_multipart()
session = cPickle.loads(session)
@@ -194,23 +209,8 @@
else:
break
+ event.set()
print 'Got', len(sessions), 'zmq workers'
- thread = threading.Thread(
- target = self.run,
- args = (poller, socket, handler_queue, handlers_queue),
- )
- thread.daemon = True
- thread.start()
-
- def session_put(self, queue, addr):
-
- def put(ob):
- queue.put((addr, ob))
-
- return put
-
- def run(self, poller, socket, handler_queue, handlers_queue):
- import zmq
while 1:
try:
addr, ob = handler_queue.get(False)
@@ -606,12 +606,16 @@
processes.append(process)
sessions[session] = handler_queue.put
+ print len(sessionids), 'sessions'
+
if options.zmq_worker:
return
if options.zmq_boss:
handlers_queue = Queue.Queue()
- zmqBoss(options.zmq_boss, sessions, handlers_queue)
+ event = threading.Event()
+ zmqBoss(options.zmq_boss, sessions, handlers_queue, event)
+ event.wait()
nsessions = len(sessions)
handlers = Handlers(nsessions)
More information about the checkins
mailing list