[Checkins] SVN: Sandbox/J1m/mongrel2-concierge/ checkpoint
Jim Fulton
jim at zope.com
Fri Aug 19 16:52:40 EDT 2011
Log message for revision 122616:
checkpoint
Changed:
U Sandbox/J1m/mongrel2-concierge/README.txt
U Sandbox/J1m/mongrel2-concierge/buildout.cfg
U Sandbox/J1m/mongrel2-concierge/setup.py
A Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/
A Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/__init__.py
A Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/simul.py
-=-
Modified: Sandbox/J1m/mongrel2-concierge/README.txt
===================================================================
--- Sandbox/J1m/mongrel2-concierge/README.txt 2011-08-19 20:39:40 UTC (rev 122615)
+++ Sandbox/J1m/mongrel2-concierge/README.txt 2011-08-19 20:52:40 UTC (rev 122616)
@@ -1,10 +1,12 @@
-Title Here
-**********
+Mongrel2-based request-concierge experiment
+*******************************************
+We have applications that manage and deliver large content collections
+spread over multiple machines. We want to requests to these machines
+inteligently so that similar requests tend to stay on the same
+machines.
-To learn more, see
-
Changes
*******
Modified: Sandbox/J1m/mongrel2-concierge/buildout.cfg
===================================================================
--- Sandbox/J1m/mongrel2-concierge/buildout.cfg 2011-08-19 20:39:40 UTC (rev 122615)
+++ Sandbox/J1m/mongrel2-concierge/buildout.cfg 2011-08-19 20:52:40 UTC (rev 122616)
@@ -4,7 +4,7 @@
[test]
recipe = zc.recipe.testrunner
-eggs =
+eggs = zc.m2rc [test]
[py]
recipe = zc.recipe.egg
Modified: Sandbox/J1m/mongrel2-concierge/setup.py
===================================================================
--- Sandbox/J1m/mongrel2-concierge/setup.py 2011-08-19 20:39:40 UTC (rev 122615)
+++ Sandbox/J1m/mongrel2-concierge/setup.py 2011-08-19 20:52:40 UTC (rev 122616)
@@ -11,12 +11,14 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-name, version = 'zc.', '0'
+name, version = 'zc.m2rc', '0'
-install_requires = ['setuptools']
+install_requires = ['setuptools', 'pyzmq-static', 'pylru']
extras_require = dict(test=['zope.testing'])
entry_points = """
+[console_scripts]
+simul = zc.m2rc.simul:main
"""
from setuptools import setup
Added: Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/__init__.py
===================================================================
--- Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/__init__.py (rev 0)
+++ Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/__init__.py 2011-08-19 20:52:40 UTC (rev 122616)
@@ -0,0 +1 @@
+#
Property changes on: Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/__init__.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/simul.py
===================================================================
--- Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/simul.py (rev 0)
+++ Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/simul.py 2011-08-19 20:52:40 UTC (rev 122616)
@@ -0,0 +1,382 @@
+import bisect
+import cPickle
+import logging
+import multiprocessing
+import optparse
+import os
+import random
+import pylru
+import time
+import threading
+import traceback
+import sys
+import zmq
+
+mongrel2_pushreq_addr = 'tcp://127.0.0.1:9901'
+mongrel2_subresp_addr = 'tcp://127.0.0.1:9902'
+broker_apply_addr = 'tcp://127.0.0.1:9903'
+
+logging.basicConfig(level=logging.INFO, format='%(message)s')
+
+log_level = logging.INFO
+
+def log(*args):
+ logging.log(log_level, ' '.join(map(str, args)))
+
+def mongrel2(nsites=400, noids=10000, nout=1000, minsize=100, maxsize=2000,
+ max_requests=100000, show_responses=True):
+ log('mongrel2 pid=%s' % os.getpid())
+ context = zmq.Context()
+ push = context.socket(zmq.PUSH)
+ push.bind(mongrel2_pushreq_addr)
+ sub = context.socket(zmq.SUB)
+ sub.bind(mongrel2_subresp_addr)
+ sub.setsockopt(zmq.SUBSCRIBE, '')
+
+ poller = zmq.Poller()
+ poller.register(sub, zmq.POLLIN)
+
+ oids = range(noids)
+ sites = range(nsites)
+
+ # grossly model unequal site sizes.
+ sites = sites[:100] + sites
+ sites = sites[:50] + sites
+ sites = sites[:20] + sites
+ sites = sites[:10]*2 + sites
+ sites = sites[:5]*3 + sites
+
+ nreq = 0
+ outstanding = {}
+ while 1:
+
+ if nreq >= max_requests and not outstanding:
+ break
+
+ while nreq < max_requests and len(outstanding) < nout:
+ site = random.choice(sites)
+ size = random.randint(minsize, maxsize)
+ work = random.sample(oids, size)
+ nreq += 1
+ outstanding[nreq] = sum(work), time.time()
+ work[0:0] = site, nreq
+ #log('request', nreq, sum(work[2:]), site, size)
+ push.send(cPickle.dumps(work, 1))
+
+ ready = dict(poller.poll())
+ if ready.get(sub) == zmq.POLLIN:
+ mess = cPickle.loads(sub.recv())
+ if isinstance(mess, str):
+ exec mess
+ continue
+
+ rreq, result, wtime, n, nhit, nmiss, nevict, wid, lresume = mess
+ sresult, start = outstanding.pop(rreq)
+ assert result == sresult, (rreq, result, sresult)
+ if show_responses:
+ log('response', rreq, site, size, time.time()-start,
+ wtime, n, nhit, nmiss, nevict, wid, lresume)
+
+
+def worker(id, cache_max, queue_size=5, nsummary=100, time_ring_size=100,
+ prefix='worker'):
+ id = '%s%s' % (prefix, id)
+ log('%s pid=%s' % (id, os.getpid()))
+ resume = {}
+ resume_gen = 0
+
+ context = zmq.Context()
+ broker = context.socket(zmq.XREQ)
+ broker.connect(broker_apply_addr)
+
+ broker.send(cPickle.dumps((id, resume, resume_gen), 1))
+ nrequested = 1
+
+ m2 = context.socket(zmq.PUB)
+ m2.connect(mongrel2_subresp_addr)
+
+ cache = pylru.lrucache(cache_max)
+
+ time_ring = []
+ time_ring_pos = 0
+
+ njobs = 0
+ sum_elapsed = sum_oids = sum_hits = sum_miss = sum_evicts = 0
+ while 1:
+ if njobs%nsummary == 0 and njobs:
+ print ' '.join(map(str, (
+ id, njobs,
+ int(nsummary*60/sum_elapsed),
+ sum_oids/nsummary,
+ sum_hits/nsummary,
+ sum_miss/nsummary,
+ sum_evicts/nsummary,
+ list(reversed(sorted(map(int, resume.itervalues())))),
+ )))
+ sum_elapsed = sum_oids = sum_hits = sum_miss = sum_evicts = 0
+
+ job = cPickle.loads(broker.recv())
+ njobs += 1
+ nrequested -= 1
+ site = job.pop(0)
+ if site not in resume:
+ resume[site] = 1
+ resume_gen += 1
+
+ # request more work while we're working on this request, so we don't
+ # have to wait for a round trip when we're ready for more.
+ while nrequested < queue_size:
+ broker.send(cPickle.dumps((id, resume, resume_gen), 1))
+ nrequested += 1
+
+ nreq = job.pop(0)
+ start = time.time()
+ n = 0
+ nhit = nmiss = nevict = 0
+ for oid in job:
+ n += 1
+ key = site, oid
+ if key in cache:
+ nhit += 1
+ else:
+ nmiss += 1
+ if len(cache) >= cache_max:
+ nevict += 1
+ cache[key] = 1
+
+ # optimistically simulate a db load
+ time.sleep(.001)
+
+ elapsed = max(time.time() - start, .0001)
+
+ m2.send(cPickle.dumps(
+ (nreq, sum(job),
+ elapsed, n, nhit, nmiss, nevict, id, len(resume))))
+
+ sum_elapsed += elapsed
+ sum_oids += n
+ sum_hits += nhit
+ sum_miss += nmiss
+ sum_evicts += nevict
+
+ time_ring_pos = njobs % time_ring_size
+ try:
+ time_ring[time_ring_pos] = site, elapsed
+ except IndexError:
+ time_ring.append((site, elapsed))
+
+ if njobs % time_ring_size == 0:
+ bysite = {}
+ for site, elapsed in time_ring:
+ sumn = bysite.get(site)
+ if sumn:
+ sumn[0] += elapsed
+ sumn[1] += 1
+ else:
+ bysite[site] = [elapsed, 1]
+ resume = dict((site, n/sum)
+ for (site, (sum, n)) in bysite.iteritems()
+ )
+ resume_gen += 1
+
+
+def broker():
+ log('broker pid=%s' % os.getpid())
+ context = zmq.Context()
+ m2_socket = context.socket(zmq.PULL)
+ m2_socket.connect(mongrel2_pushreq_addr)
+ workers_socket = context.socket(zmq.XREP)
+ workers_socket.bind(broker_apply_addr)
+
+ workers_by_site = {None: []}
+ global workers_by_site
+ workers = workers_by_site[None]
+ workers_by_addr = {}
+ availability = {}
+
+ poller = zmq.Poller()
+ poller.register(m2_socket, zmq.POLLIN)
+ poller.register(workers_socket, zmq.POLLIN)
+
+ nreq = nhit = 0
+ lastt = time.time()
+
+ while 1:
+
+ ready = dict(poller.poll(0 if workers else 1000))
+
+ if ready.get(workers_socket) == zmq.POLLIN:
+ addr, resume = workers_socket.recv_multipart()
+ resume = cPickle.loads(resume)
+ if isinstance(resume, str):
+ exec resume
+ continue
+
+ wid, resume, resume_gen = resume
+
+ worker = workers_by_addr.get(addr)
+ if worker is None:
+ worker = workers_by_addr[addr] = Worker(wid, addr)
+ else:
+ worker.count += 1
+
+ worker.new_resume(resume, resume_gen)
+
+ # we want to collect resumes before assigning work so as
+ # to increase the chance of finding the best worker for
+ # the job.
+ continue
+
+ if workers and ready.get(m2_socket) == zmq.POLLIN:
+ nreq += 1
+ work_message = m2_socket.recv()
+ data = cPickle.loads(work_message)
+ site = data[0]
+ site_workers = workers_by_site.get(site)
+ if site_workers:
+ worker = site_workers[-1][1]
+ nhit += 1
+ else:
+ worker = workers[-1][1]
+
+ worker.count -= 1
+ if worker.count < 1:
+ worker.unregister()
+ del workers_by_addr[worker.addr]
+
+ if nreq%1000 == 0:
+ log('hitrate',
+ 100*nhit/nreq,
+ len(workers),
+ len(workers_by_site),
+ int(60*nreq/(time.time()-lastt)),
+ )
+
+ workers_socket.send_multipart([worker.addr, work_message])
+
+
+class Worker:
+
+ count = 1
+ resume_gen = None
+
+ def __init__(self, wid, addr):
+ self.wid = wid
+ self.addr = addr
+
+ def new_resume(self, resume, resume_gen):
+ if resume_gen == self.resume_gen:
+ return
+
+ if self.resume_gen is not None:
+ self.unregister()
+
+ self.resume = resume
+ self.resume_gen = resume_gen
+ self.score = - sum(resume.itervalues())
+ self.register()
+
+ def register(self):
+
+ def _register(workers, score):
+ item = score, self
+ index = bisect.bisect_left(workers, item)
+ workers.insert(index, item)
+
+ _register(workers_by_site[None], self.score)
+
+ for site, score in self.resume.iteritems():
+ site_workers = workers_by_site.get(site)
+ if site_workers is None:
+ site_workers = workers_by_site[site] = []
+ _register(site_workers, score)
+
+
+ def unregister(self):
+
+ def _unregister(workers, score):
+ index = bisect.bisect_left(workers, (score, self))
+ del workers[index]
+
+
+ _unregister(workers_by_site[None], self.score)
+
+ for site, score in self.resume.iteritems():
+ site_workers = workers_by_site.get(site)
+ if site_workers is None:
+ site_workers = workers_by_site[site] = []
+ _unregister(site_workers, score)
+
+
+ def __repr__(self):
+ return "worker(%r, %s, %s)" % (
+ self.wid, len(self.resume), self.score)
+
+parser = optparse.OptionParser("Usage: %prog [options]")
+
+parser.add_option("--workers", "-w", type="int", default=20)
+parser.add_option("--cache-size", "-c", type="int", default=10000)
+parser.add_option("--worker-queue-size", "-q", type="int", default=5)
+
+parser.add_option("--sites", "-s", type="int", default=400)
+parser.add_option("--oids", type="int", default=10000, help="oids per site")
+parser.add_option("--outstanding", "-o", type="int", default=200)
+parser.add_option("--min-size", type="int", default=100)
+parser.add_option("--max-size", type="int", default=2000)
+parser.add_option("--requests", "-R", type="int", default=100000)
+parser.add_option("--show-responses", "-r", action='store_true')
+parser.add_option("--workers-only")
+
+def main(args=None):
+ if args is None:
+ args = sys.argv[1:]
+
+ print args
+ options, args = parser.parse_args(args)
+
+ Process = multiprocessing.Process
+
+ if not options.workers_only:
+ mongrel_process = Process(
+ target=mongrel2,
+ kwargs=dict(
+ nsites=options.sites,
+ noids=options.oids,
+ nout=options.outstanding,
+ minsize=options.min_size,
+ maxsize=options.max_size,
+ max_requests=options.requests,
+ show_responses=options.show_responses,
+ ))
+ mongrel_process.start()
+
+ p = Process(
+ target=broker,
+ #kwargs=dict(),
+ )
+ p.daemon = True
+ p.start()
+
+ global new_worker_process
+ def new_worker_process(
+ id,
+ cache_max=options.cache_size,
+ queue_size=options.worker_queue_size,
+ prefix=options.workers_only or 'worker',
+ daemon=True
+ ):
+ p = Process(
+ target=worker, kwargs=dict(
+ id=i,
+ cache_max=cache_max,
+ queue_size=queue_size,
+ prefix=prefix
+ ))
+ p.daemon = daemon
+ p.start()
+
+ for i in range(options.workers):
+ new_worker_process(id=i, daemon = not options.workers_only)
+
+ if not options.workers_only:
+ mongrel_process.join()
Property changes on: Sandbox/J1m/mongrel2-concierge/src/zc/m2rc/simul.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
More information about the checkins
mailing list