[Checkins] SVN: Sandbox/J1m/mongrel2-concierge/s Fixed a 0mq polling that cause excessive broker cpu usage.

Jim Fulton jim at zope.com
Tue Oct 4 06:29:26 EST 2011


Log message for revision 123011:
  Fixed a 0mq polling that cause excessive broker cpu usage.
  
  Rearranged code to make profiling easier.
  
  Use marshal rather than cPickle.  This was in attempt to reduce broker
  cpu before pollin bug was found.
  

Changed:
  U   Sandbox/J1m/mongrel2-concierge/setup.py
  U   Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/simul.py

-=-
Modified: Sandbox/J1m/mongrel2-concierge/setup.py
===================================================================
--- Sandbox/J1m/mongrel2-concierge/setup.py	2011-10-04 10:55:03 UTC (rev 123010)
+++ Sandbox/J1m/mongrel2-concierge/setup.py	2011-10-04 11:29:26 UTC (rev 123011)
@@ -19,6 +19,8 @@
 entry_points = """
 [console_scripts]
 simul = zc.m2rc.simul:main
+broker_exec = zc.m2rc.simul:broker_exec
+m2_exec = zc.m2rc.simul:m2_exec
 """
 
 from setuptools import setup

Modified: Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/simul.py
===================================================================
--- Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/simul.py	2011-10-04 10:55:03 UTC (rev 123010)
+++ Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/simul.py	2011-10-04 11:29:26 UTC (rev 123011)
@@ -1,5 +1,7 @@
+from marshal import dumps, loads
+
 import bisect
-import cPickle
+import marshal
 import logging
 import multiprocessing
 import optparse
@@ -61,11 +63,11 @@
             outstanding[nreq] = sum(work), time.time()
             work[0:0] = site, nreq
             #log('request', nreq, sum(work[2:]), site, size)
-            push.send(cPickle.dumps(work, 1))
+            push.send(dumps(work, 1))
 
         ready = dict(poller.poll())
         if ready.get(sub) == zmq.POLLIN:
-            mess = cPickle.loads(sub.recv())
+            mess = loads(sub.recv())
             if isinstance(mess, str):
                 exec mess
                 continue
@@ -89,7 +91,7 @@
     broker = context.socket(zmq.XREQ)
     broker.connect(broker_apply_addr)
 
-    broker.send(cPickle.dumps((id, resume, resume_gen), 1))
+    broker.send(dumps((id, resume, resume_gen), 1))
     nrequested = 1
 
     m2 = context.socket(zmq.PUB)
@@ -115,7 +117,7 @@
                 )))
             sum_elapsed = sum_oids = sum_hits = sum_miss = sum_evicts = 0
 
-        job = cPickle.loads(broker.recv())
+        job = loads(broker.recv())
         njobs += 1
         nrequested -= 1
         site = job.pop(0)
@@ -126,7 +128,7 @@
         # request more work while we're working on this request, so we don't
         # have to wait for a round trip when we're ready for more.
         while nrequested < queue_size:
-            broker.send(cPickle.dumps((id, resume, resume_gen), 1))
+            broker.send(dumps((id, resume, resume_gen), 1))
             nrequested += 1
 
         nreq = job.pop(0)
@@ -149,7 +151,7 @@
 
         elapsed = max(time.time() - start, .0001)
 
-        m2.send(cPickle.dumps(
+        m2.send(dumps(
             (nreq, sum(job),
              elapsed, n, nhit, nmiss, nevict, id, len(resume))))
 
@@ -180,7 +182,49 @@
             resume_gen += 1
 
 
-def broker():
+def handle_worker(workers_socket, workers_by_addr):
+    addr, resume = workers_socket.recv_multipart()
+    resume = loads(resume)
+    if isinstance(resume, str):
+        return resume
+
+    wid, resume, resume_gen = resume
+
+    worker = workers_by_addr.get(addr)
+    if worker is None:
+        worker = workers_by_addr[addr] = Worker(wid, addr)
+    else:
+        worker.count += 1
+
+    worker.new_resume(resume, resume_gen)
+
+def handle_m2(m2_socket, workers_by_site, workers, workers_by_addr,
+              workers_socket):
+    work_message = m2_socket.recv()
+    data = loads(work_message)
+    site = data[0]
+    site_workers = workers_by_site.get(site)
+    hit = 0
+    if site_workers:
+        worker = site_workers[-1][1]
+        hit = 1
+    else:
+        worker = workers[-1][1]
+
+    worker.count -= 1
+    if worker.count < 1:
+        worker.unregister()
+        del workers_by_addr[worker.addr]
+
+    workers_socket.send_multipart([worker.addr, work_message])
+    return hit
+
+def dump_profile(profiler, name):
+    profiler.disable()
+    profiler.dump_stats(name)
+    profiler.enable()
+
+def broker(profile=None):
     log('broker pid=%s' % os.getpid())
     context = zmq.Context()
     m2_socket = context.socket(zmq.PULL)
@@ -201,27 +245,20 @@
     nreq = nhit = 0
     lastt = time.time()
 
+    if profile:
+        import cProfile
+        profiler = cProfile.Profile()
+        profiler.enable()
+
     while 1:
 
-        ready = dict(poller.poll(0 if workers else 1000))
+        ready = dict(poller.poll())
 
         if ready.get(workers_socket) == zmq.POLLIN:
-            addr, resume = workers_socket.recv_multipart()
-            resume = cPickle.loads(resume)
-            if isinstance(resume, str):
-                exec resume
-                continue
+            command = handle_worker(workers_socket, workers_by_addr)
+            if command:
+                exec command
 
-            wid, resume, resume_gen = resume
-
-            worker = workers_by_addr.get(addr)
-            if worker is None:
-                worker = workers_by_addr[addr] = Worker(wid, addr)
-            else:
-                worker.count += 1
-
-            worker.new_resume(resume, resume_gen)
-
             # we want to collect resumes before assigning work so as
             # to increase the chance of finding the best worker for
             # the job.
@@ -229,21 +266,8 @@
 
         if workers and ready.get(m2_socket) == zmq.POLLIN:
             nreq += 1
-            work_message = m2_socket.recv()
-            data = cPickle.loads(work_message)
-            site = data[0]
-            site_workers = workers_by_site.get(site)
-            if site_workers:
-                worker = site_workers[-1][1]
-                nhit += 1
-            else:
-                worker = workers[-1][1]
-
-            worker.count -= 1
-            if worker.count < 1:
-                worker.unregister()
-                del workers_by_addr[worker.addr]
-
+            nhit += handle_m2(m2_socket, workers_by_site, workers,
+                              workers_by_addr, workers_socket)
             if nreq%1000 == 0:
                 log('hitrate',
                     100*nhit/nreq,
@@ -252,9 +276,6 @@
                     int(60*nreq/(time.time()-lastt)),
                     )
 
-            workers_socket.send_multipart([worker.addr, work_message])
-
-
 class Worker:
 
     count = 1
@@ -326,6 +347,7 @@
 parser.add_option("--requests", "-R", type="int", default=100000)
 parser.add_option("--show-responses", "-r", action='store_true')
 parser.add_option("--workers-only")
+parser.add_option("--profile")
 
 def main(args=None):
     if args is None:
@@ -352,7 +374,9 @@
 
         p = Process(
             target=broker,
-            #kwargs=dict(),
+            kwargs=dict(
+                profile=options.profile,
+                ),
             )
         p.daemon = True
         p.start()
@@ -380,3 +404,19 @@
 
     if not options.workers_only:
         mongrel_process.join()
+
+def broker_exec():
+    context = zmq.Context()
+    broker = context.socket(zmq.XREQ)
+    broker.connect(broker_apply_addr)
+    for arg in sys.argv[1:]:
+        broker.send(dumps(arg))
+    broker.close()
+
+def m2_exec():
+    context = zmq.Context()
+    m2 = context.socket(zmq.PUB)
+    m2.connect(mongrel2_subresp_addr)
+    for arg in sys.argv[1:]:
+        m2.send(dumps(arg))
+    m2.close()



More information about the checkins mailing list