[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py Tweaked to only use a zmq socket in one thread.

Jim Fulton jim at zope.com
Fri Aug 12 13:10:53 EDT 2011


Log message for revision 122566:
  Tweaked to only use a zmq socket in one thread.
  
  Also output a little more debug info.
  

Changed:
  U   zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py

-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py	2011-08-12 15:55:08 UTC (rev 122565)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py	2011-08-12 17:10:53 UTC (rev 122566)
@@ -171,7 +171,22 @@
 
 class zmqBoss:
 
-    def __init__(self, zmqaddr, sessions, handlers_queue):
+    def __init__(self, zmqaddr, sessions, handlers_queue, event):
+        thread = threading.Thread(
+            target = self.run,
+            args = (zmqaddr, sessions, handlers_queue, event),
+            )
+        thread.daemon = True
+        thread.start()
+
+    def session_put(self, queue, addr):
+
+        def put(ob):
+            queue.put((addr, ob))
+
+        return put
+
+    def run(self, zmqaddr, sessions, handlers_queue, event):
         import zmq
         context = zmq.Context()
         socket = context.socket(zmq.XREP)
@@ -182,7 +197,7 @@
         session_addrs = {}
         handler_queue = Queue.Queue()
         while 1:
-            ready = dict(poller.poll(1000))
+            ready = dict(poller.poll(2000))
             if ready.get(socket) == zmq.POLLIN:
                 addr, session = socket.recv_multipart()
                 session = cPickle.loads(session)
@@ -194,23 +209,8 @@
             else:
                 break
 
+        event.set()
         print 'Got', len(sessions), 'zmq workers'
-        thread = threading.Thread(
-            target = self.run,
-            args = (poller, socket, handler_queue, handlers_queue),
-            )
-        thread.daemon = True
-        thread.start()
-
-    def session_put(self, queue, addr):
-
-        def put(ob):
-            queue.put((addr, ob))
-
-        return put
-
-    def run(self, poller, socket, handler_queue, handlers_queue):
-        import zmq
         while 1:
             try:
                 addr, ob = handler_queue.get(False)
@@ -606,12 +606,16 @@
             processes.append(process)
             sessions[session] = handler_queue.put
 
+    print len(sessionids), 'sessions'
+
     if options.zmq_worker:
         return
 
     if options.zmq_boss:
         handlers_queue = Queue.Queue()
-        zmqBoss(options.zmq_boss, sessions, handlers_queue)
+        event = threading.Event()
+        zmqBoss(options.zmq_boss, sessions, handlers_queue, event)
+        event.wait()
 
     nsessions = len(sessions)
     handlers = Handlers(nsessions)



More information about the checkins mailing list