[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/ Separated core lb logic into separate pool class to facilitate
Jim Fulton
jim at zope.com
Sun Nov 6 16:49:39 UTC 2011
Log message for revision 123293:
Separated core lb logic into separate pool class to facilitate
testing. Added logic for dealing with backlogs. Refactored data
structures a lot.
Changed:
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
A Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
U Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2011-11-06 16:49:36 UTC (rev 123292)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2011-11-06 16:49:38 UTC (rev 123293)
@@ -11,67 +11,161 @@
logger = logging.getLogger(__name__)
-class Server:
+class LB:
- def __init__(self, wsgi_addr, worker_addr, classifier):
- self.workers = {None: []}
- self.workers_event = gevent.event.Event()
- self.wsgi_addr = wsgi_addr
- self.worker_addr = worker_addr
+ def __init__(self, worker_addr, classifier):
self.classifier = classifier
- self.worker_server = gevent.server.StreamServer(
- worker_addr, self.handle_worker)
- self.worker_server.start()
- self.wsgi_server = gevent.pywsgi.WSGIServer(wsgi_addr, self.handle_wsgi)
- self.wsgi_server.start()
+ self.pool = Pool()
+ gevent.server.StreamServer(worker_addr, self.handle_worker).start()
def handle_worker(self, socket, addr):
logger.info('new worker')
- Worker(self, socket, addr)
+ Worker(self.pool, socket, addr)
def handle_wsgi(self, env, start_response):
rclass = self.classifier(env)
logger.debug('wsgi: %s', rclass)
- env['zc.resumelb.request_class'] = rclass
while 1:
- workers = self.workers.get(rclass)
- if workers:
- worker = workers[-1][1]
- else:
- workers = self.workers.get(None)
- if workers:
- worker = workers[-1][1]
- assert rclass not in worker.resume
- worker.unregister()
- worker.resume[rclass] = 1
- worker.register()
- else:
- self.workers_event.clear()
- self.workers_event.wait()
- continue
-
+ worker = self.pool.get(rclass)
try:
- return worker.handle(env, start_response)
+ return worker.handle(rclass, env, start_response)
except worker.Disconnected:
# XXX need to be more careful about whether
# start_response was called.
- if int(env.get(CONTENT_LENGTH, 0)) == 0:
+ if int(env.get(CONTENT_LENGTH, None)) == 0:
logger.info("retrying %s", env)
else:
raise
+ finally:
+ self.pool.put(worker)
+class Pool:
+
+ def __init__(self, max_backlog=40):
+ self.max_backlog = max_backlog
+ self.unskilled = [set() for i in range(max_backlog+1)]
+ self.skilled = {}
+ self.resumes = {}
+ self.backlogs = {}
+ self.event = gevent.event.Event()
+
+ def __repr__(self):
+ skilled = self.skilled
+ backlogs = self.backlogs
+ outl = []
+ out = outl.append
+ out('Request classes:')
+ for rclass in sorted(skilled):
+ out(' %s: %s'
+ % (rclass,
+ ', '.join('%s(%s,%s)' % (worker, score, backlogs[worker])
+ for (score, worker) in skilled[rclass])
+ ))
+ out('Backlogs:')
+ for backlog, workers in enumerate(self.unskilled):
+ if workers:
+ out(' %s: %s' % (backlog, sorted(workers)))
+ return '\n'.join(outl)
+
+ def new_resume(self, worker, resume):
+ skilled = self.skilled
+ resumes = self.resumes
+ try:
+ old = resumes[worker]
+ except KeyError:
+ self.backlogs[worker] = 0
+ self.unskilled[0].add(worker)
+ self.event.set()
+ else:
+ for rclass, score in old.iteritems():
+ workers = skilled[rclass]
+ del workers[bisect.bisect_left(workers, (score, worker))]
+
+ for rclass, score in resume.iteritems():
+ bisect.insort(skilled.setdefault(rclass, []), (score, worker))
+
+ resumes[worker] = resume
+
+ def remove(self, worker):
+ self.new_resume(worker, {})
+ backlog = self.backlogs.pop(worker)
+ self.unskilled[backlog].remove(worker)
+ del self.resumes[worker]
+
+ def get(self, rclass, timeout=None):
+ """Get a worker to handle a request class
+ """
+ max_backlog = self.max_backlog
+ backlogs = self.backlogs
+ unskilled = self.unskilled
+ while 1:
+
+ # Look for a skilled worker
+ best_score = 0
+ for score, worker in reversed(self.skilled.get(rclass, ())):
+ backlog = backlogs[worker] + 1
+ if backlog > max_backlog:
+ continue
+ score /= backlog
+ if score <= best_score:
+ break
+ best_score = score
+ best_backlog = backlog
+ best_worker = worker
+
+ if best_score:
+ unskilled[best_backlog-1].remove(best_worker)
+ unskilled[best_backlog].add(best_worker)
+ backlogs[best_worker] = best_backlog
+ return best_worker
+
+ # Look for an unskilled worker
+ for backlog, workers in enumerate(unskilled):
+ if workers:
+ worker = workers.pop()
+ backlog += 1
+ try:
+ unskilled[backlog].add(worker)
+ except IndexError:
+ workers.add(worker)
+ else:
+ backlogs[worker] = backlog
+ resume = self.resumes[worker]
+ if rclass not in resume:
+ self.resumes[worker][rclass] = 1.0
+ bisect.insort(self.skilled.setdefault(rclass, []),
+ (1.0, worker))
+ return worker
+
+ # Dang. Couldn't find a worker, either because we don't
+ # have any yet, or because they're all too busy.
+ self.event.clear()
+ self.event.wait(timeout)
+ if timeout is not None and not self.event.is_set():
+ return None
+
+ def put(self, worker):
+ backlogs = self.backlogs
+ unskilled = self.unskilled
+ backlog = backlogs[worker]
+ unskilled[backlog].remove(worker)
+ backlog -= 1
+ unskilled[backlog].add(worker)
+ backlogs[worker] = backlog
+ self.event.set()
+
class Worker(zc.resumelb.util.Worker):
- def __init__(self, server, socket, addr):
- self.server = server
+ maxrno = (1<<32) - 1
+
+ def __init__(self, pool, socket, addr):
+ self.pool = pool
self.nrequest = 0
- self.resume = {}
readers = self.connected(socket, addr)
- self.register()
- while self.connected:
+ while self.is_connected:
try:
rno, data = zc.resumelb.util.read_message(socket)
except gevent.GreenletExit:
@@ -79,20 +173,20 @@
return
if rno == 0:
- self.unregister()
- self.resume = data
- self.register()
+ pool.new_resume(self, data)
else:
readers[rno](data)
- def handle(self, env, start_response):
+ def handle(self, rclass, env, start_response):
logger.debug('handled by %s', self.addr)
env = env.copy()
err = env.pop('wsgi.errors')
input = env.pop('wsgi.input')
+ env['zc.resumelb.request_class'] = rclass
- rno = self.nrequest = self.nrequest + 1
+ rno = self.nrequest + 1
+ self.nrequest = rno % self.maxrno
get = self.start(rno)
self.put((rno, env))
@@ -128,39 +222,8 @@
return content()
- def register(self):
- resume = self.resume
- self.score = - sum(resume.itervalues())
- workers_by = self.server.workers
-
- def _register(workers, score):
- item = score, self
- index = bisect.bisect_left(workers, item)
- workers.insert(index, item)
-
- _register(workers_by[None], self.score)
-
- for site, score in resume.iteritems():
- workers = workers_by.get(site)
- if workers is None:
- workers = workers_by[site] = []
- _register(workers, score)
-
- self.server.workers_event.set()
-
- def unregister(self):
- workers_by = self.server.workers
-
- def _unregister(workers, score):
- index = bisect.bisect_left(workers, (score, self))
- del workers[index]
-
- _unregister(workers_by[None], self.score)
- for site, score in self.resume.iteritems():
- _unregister(workers_by[site], score)
-
def disconnected(self):
- self.unregister()
+ self.pool.remove(self)
zc.resumelb.util.Worker.disconnected(self)
def parse_addr(addr):
@@ -175,9 +238,9 @@
args = sys.argv[1:]
logging.basicConfig(level=logging.INFO)
- laddr, waddr = args
- Server(parse_addr(laddr), parse_addr(waddr), host_classifier)
+ wsgi_addr, lb_addr = map(parse_addr, args)
- gevent.hub.get_hub().switch()
+ lb = LB(lb_addr, host_classifier)
+ gevent.pywsgi.WSGIServer(wsgi_addr, lb.handle_wsgi).serve_forever()
Added: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test 2011-11-06 16:49:38 UTC (rev 123293)
@@ -0,0 +1,204 @@
+Resume-based load balancer pool
+===============================
+
+The heart of the resume-based load balancer is the pool, which
+implements the load balancing algorithm. The pool has a collection of
+workers organized according to their resumes.
+
+The load balancer works by accepting remote worker connections and
+adding local workers to the pool, and by accepting wsgi request,
+getting local workers from the pool and passing the wsgi requests to
+the local workers, which, in term, forwward the requests to the remote
+workers.
+
+We'll test the pool with stand-ins for the local workers.
+
+ >>> import zc.resumelb.lb
+ >>> pool = zc.resumelb.lb.Pool(max_backlog=5)
+
+We specified a maximum per-worker backlog for the pool. We specified
+a fairly low max backlog to make it easier to see what happens when
+a worker gets too backed up.
+
+The get method is used to get a worker from the pool. A request class
+and an optional timeout is passed. (The timeout is mainly useful for
+testing.)
+
+ >>> pool.get('foo', 0.0)
+
+We didn't get a worker (we timed out), because we haven't added one.
+
+ >>> class Worker:
+ ... def __init__(self, name):
+ ... self.name = name
+ ... def __repr__(self):
+ ... return self.name
+ ... def __cmp__(self, other):
+ ... return cmp(self.name, other.name)
+ ... def __hash__(self):
+ ... return hash(self.name)
+
+ >>> w1 = Worker('w1')
+ >>> pool.new_resume(w1, {})
+
+As far as the pool is concerned, any object that can be in a set or be
+used as a dictionary key can be used as a worker. The pool doesn't
+care. We could have used ``object`` as out worker class, but we
+constructed a worker class that makes testing output more useful.
+
+ >>> pool.get('foo', 0.0)
+ w1
+
+ This time, we got the one we registered.
+
+ If we create another and register it, we'll still get the original:
+
+ >>> w2 = Worker('w2')
+ >>> pool.new_resume(w2, {})
+
+ >>> pool.get('foo')
+ w1
+
+ This is because w1 is known to be good at handling foo requests.
+
+ We'll get w2 if we pick a different request class:
+
+ >>> pool.get('bar')
+ w2
+
+ We're gonna be white box and look at the pool data structures from
+ time to time.
+
+ >>> pool
+ Request classes:
+ bar: w2(1.0,1)
+ foo: w1(1.0,2)
+ Backlogs:
+ 1: [w2]
+ 2: [w1]
+
+Here, we can see that w1 is used for the foo class and w2 for the bar
+class. Let's add another worker:
+
+ >>> w3 = Worker('w3')
+ >>> pool.new_resume(w3, {})
+
+and make some more foo requests:
+
+ >>> [pool.get('foo') for i in range(3)]
+ [w1, w1, w1]
+
+ >>> pool
+ Request classes:
+ bar: w2(1.0,1)
+ foo: w1(1.0,5)
+ Backlogs:
+ 0: [w3]
+ 1: [w2]
+ 5: [w1]
+
+Even though we still had a worker with no backlog, we kept sending
+requests to w1. But but now, w1 has reached it's maximum backlog. If
+we make another foo request, we'll start using w3, and when that's
+reached it's maximum backlog, we'll start using w2:
+
+ >>> [pool.get('foo') for i in range(7)]
+ [w3, w3, w3, w3, w3, w2, w2]
+
+ >>> pool
+ Request classes:
+ bar: w2(1.0,3)
+ foo: w1(1.0,5), w2(1.0,3), w3(1.0,5)
+ Backlogs:
+ 3: [w2]
+ 5: [w1, w3]
+
+If we get all workers to the maximum backlog, we'll block until a
+worker is free.
+
+ >>> [pool.get('foo') for i in range(2)]
+ [w2, w2]
+
+ >>> pool.get('foo', 0.0)
+
+When a worker is done doing it's work, we put it back in the pool:
+
+ >>> pool.put(w1)
+ >>> pool.put(w1)
+ >>> pool.put(w1)
+ >>> pool.put(w2)
+ >>> pool.put(w3)
+ >>> pool.put(w3)
+ >>> pool
+ Request classes:
+ bar: w2(1.0,4)
+ foo: w1(1.0,2), w2(1.0,4), w3(1.0,3)
+ Backlogs:
+ 2: [w1]
+ 3: [w3]
+ 4: [w2]
+
+Now, when we get a worker, we'll get w3.
+
+ >>> pool.get('foo', 0.0)
+ w3
+
+Why? We adjust each score by the worker's backlog and search workers
+from high score to low until the adjusted score increases. Because w2
+has a higher backlog than w3, it's adjusted score is lower so we stop
+looking. This is for 2 reasons:
+
+1. We want to bias selection towards a smaller number of workers,
+ ideally those with the best scores,
+
+2. We want to reduce the amount of work done in each get call.
+
+Now that we've done some work, let's updaye the resumes. This will
+normally be done by workers after periodically collecting performance
+data.
+
+ >>> pool.new_resume(w1, {'foo': 3.0})
+
+ >>> pool.new_resume(w2, {'bar': 1.0, 'foo': 1.0})
+
+ >>> pool.new_resume(w3, {'foo': 2.0})
+
+ pool
+ Request classes:
+ bar: w2(1.0,4)
+ foo: w2(1.0,4), w3(2.0,4), w1(3.0,2)
+ Backlogs:
+ 2: [w1]
+ 4: [w2, w3]
+
+ >>> [pool.get('foo') for i in range(5)]
+ [w1, w1, w1, w3, w2]
+
+ >>> pool.put(w1)
+ >>> pool.put(w3)
+ >>> pool.put(w3)
+ >>> pool.put(w3)
+ >>> pool.put(w3)
+ >>> pool.put(w3)
+ >>> pool
+ Request classes:
+ bar: w2(1.0,5)
+ foo: w2(1.0,5), w3(2.0,0), w1(3.0,4)
+ Backlogs:
+ 0: [w3]
+ 4: [w1]
+ 5: [w2]
+
+ >>> [pool.get('foo') for i in range(5)]
+ [w3, w3, w3, w1, w3]
+
+When a worker disconnect, it's removed from the pool:
+
+ >>> pool.remove(w1)
+ >>> pool
+ Request classes:
+ bar: w2(1.0,5)
+ foo: w2(1.0,5), w3(2.0,4)
+ Backlogs:
+ 4: [w3]
+ 5: [w2]
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/tests.py 2011-11-06 16:49:36 UTC (rev 123292)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/tests.py 2011-11-06 16:49:38 UTC (rev 123293)
@@ -53,7 +53,9 @@
return unittest.TestSuite((
manuel.testing.TestSuite(
manuel.doctest.Manuel() + manuel.capture.Manuel(),
- 'worker.test',
+ *(sorted(name for name in os.listdir(os.path.dirname(__file__))
+ if name.endswith('.test')
+ )),
setUp=setUp),
))
More information about the checkins
mailing list