[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/ Separated core lb logic into separate pool class to facilitate

Jim Fulton jim at zope.com
Sun Nov 6 16:49:39 UTC 2011


Log message for revision 123293:
  Separated core lb logic into separate pool class to facilitate
  testing.  Added logic for dealing with backlogs.  Refactored data
  structures a lot.
  

Changed:
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/tests.py

-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2011-11-06 16:49:36 UTC (rev 123292)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2011-11-06 16:49:38 UTC (rev 123293)
@@ -11,67 +11,161 @@
 
 logger = logging.getLogger(__name__)
 
-class Server:
+class LB:
 
-    def __init__(self, wsgi_addr, worker_addr, classifier):
-        self.workers = {None: []}
-        self.workers_event = gevent.event.Event()
-        self.wsgi_addr = wsgi_addr
-        self.worker_addr = worker_addr
+    def __init__(self, worker_addr, classifier):
         self.classifier = classifier
-        self.worker_server = gevent.server.StreamServer(
-            worker_addr, self.handle_worker)
-        self.worker_server.start()
-        self.wsgi_server = gevent.pywsgi.WSGIServer(wsgi_addr, self.handle_wsgi)
-        self.wsgi_server.start()
+        self.pool = Pool()
+        gevent.server.StreamServer(worker_addr, self.handle_worker).start()
 
     def handle_worker(self, socket, addr):
         logger.info('new worker')
-        Worker(self, socket, addr)
+        Worker(self.pool, socket, addr)
 
     def handle_wsgi(self, env, start_response):
         rclass = self.classifier(env)
         logger.debug('wsgi: %s', rclass)
-        env['zc.resumelb.request_class'] = rclass
 
         while 1:
-            workers = self.workers.get(rclass)
-            if workers:
-                worker = workers[-1][1]
-            else:
-                workers = self.workers.get(None)
-                if workers:
-                    worker = workers[-1][1]
-                    assert rclass not in worker.resume
-                    worker.unregister()
-                    worker.resume[rclass] = 1
-                    worker.register()
-                else:
-                    self.workers_event.clear()
-                    self.workers_event.wait()
-                    continue
-
+            worker = self.pool.get(rclass)
             try:
-                return worker.handle(env, start_response)
+                return worker.handle(rclass, env, start_response)
             except worker.Disconnected:
                 # XXX need to be more careful about whether
                 # start_response was called.
-                if int(env.get(CONTENT_LENGTH, 0)) == 0:
+                if int(env.get(CONTENT_LENGTH, None)) == 0:
                     logger.info("retrying %s", env)
                 else:
                     raise
+            finally:
+                self.pool.put(worker)
 
+class Pool:
+
+    def __init__(self, max_backlog=40):
+        self.max_backlog = max_backlog
+        self.unskilled = [set() for i in range(max_backlog+1)]
+        self.skilled = {}
+        self.resumes = {}
+        self.backlogs = {}
+        self.event = gevent.event.Event()
+
+    def __repr__(self):
+        skilled = self.skilled
+        backlogs = self.backlogs
+        outl = []
+        out = outl.append
+        out('Request classes:')
+        for rclass in sorted(skilled):
+            out('  %s: %s'
+                % (rclass,
+                   ', '.join('%s(%s,%s)' % (worker, score, backlogs[worker])
+                             for (score, worker) in skilled[rclass])
+                   ))
+        out('Backlogs:')
+        for backlog, workers in enumerate(self.unskilled):
+            if workers:
+                out('  %s: %s' % (backlog, sorted(workers)))
+        return '\n'.join(outl)
+
+    def new_resume(self, worker, resume):
+        skilled = self.skilled
+        resumes = self.resumes
+        try:
+            old = resumes[worker]
+        except KeyError:
+            self.backlogs[worker] = 0
+            self.unskilled[0].add(worker)
+            self.event.set()
+        else:
+            for rclass, score in old.iteritems():
+                workers = skilled[rclass]
+                del workers[bisect.bisect_left(workers, (score, worker))]
+
+        for rclass, score in resume.iteritems():
+            bisect.insort(skilled.setdefault(rclass, []), (score, worker))
+
+        resumes[worker] = resume
+
+    def remove(self, worker):
+        self.new_resume(worker, {})
+        backlog = self.backlogs.pop(worker)
+        self.unskilled[backlog].remove(worker)
+        del self.resumes[worker]
+
+    def get(self, rclass, timeout=None):
+        """Get a worker to handle a request class
+        """
+        max_backlog = self.max_backlog
+        backlogs = self.backlogs
+        unskilled = self.unskilled
+        while 1:
+
+            # Look for a skilled worker
+            best_score = 0
+            for score, worker in reversed(self.skilled.get(rclass, ())):
+                backlog = backlogs[worker] + 1
+                if backlog > max_backlog:
+                    continue
+                score /= backlog
+                if score <= best_score:
+                    break
+                best_score = score
+                best_backlog = backlog
+                best_worker = worker
+
+            if best_score:
+                unskilled[best_backlog-1].remove(best_worker)
+                unskilled[best_backlog].add(best_worker)
+                backlogs[best_worker] = best_backlog
+                return best_worker
+
+            # Look for an unskilled worker
+            for backlog, workers in enumerate(unskilled):
+                if workers:
+                    worker = workers.pop()
+                    backlog += 1
+                    try:
+                        unskilled[backlog].add(worker)
+                    except IndexError:
+                        workers.add(worker)
+                    else:
+                        backlogs[worker] = backlog
+                        resume = self.resumes[worker]
+                        if rclass not in resume:
+                            self.resumes[worker][rclass] = 1.0
+                            bisect.insort(self.skilled.setdefault(rclass, []),
+                                          (1.0, worker))
+                        return worker
+
+            # Dang. Couldn't find a worker, either because we don't
+            # have any yet, or because they're all too busy.
+            self.event.clear()
+            self.event.wait(timeout)
+            if timeout is not None and not self.event.is_set():
+                return None
+
+    def put(self, worker):
+        backlogs = self.backlogs
+        unskilled = self.unskilled
+        backlog = backlogs[worker]
+        unskilled[backlog].remove(worker)
+        backlog -= 1
+        unskilled[backlog].add(worker)
+        backlogs[worker] = backlog
+        self.event.set()
+
 class Worker(zc.resumelb.util.Worker):
 
-    def __init__(self, server, socket, addr):
-        self.server = server
+    maxrno = (1<<32) - 1
+
+    def __init__(self, pool, socket, addr):
+        self.pool = pool
         self.nrequest = 0
-        self.resume = {}
 
         readers = self.connected(socket, addr)
-        self.register()
 
-        while self.connected:
+        while self.is_connected:
             try:
                 rno, data = zc.resumelb.util.read_message(socket)
             except gevent.GreenletExit:
@@ -79,20 +173,20 @@
                 return
 
             if rno == 0:
-                self.unregister()
-                self.resume = data
-                self.register()
+                pool.new_resume(self, data)
             else:
                 readers[rno](data)
 
-    def handle(self, env, start_response):
+    def handle(self, rclass, env, start_response):
         logger.debug('handled by %s', self.addr)
 
         env = env.copy()
         err = env.pop('wsgi.errors')
         input = env.pop('wsgi.input')
+        env['zc.resumelb.request_class'] = rclass
 
-        rno = self.nrequest = self.nrequest + 1
+        rno = self.nrequest + 1
+        self.nrequest = rno % self.maxrno
         get = self.start(rno)
 
         self.put((rno, env))
@@ -128,39 +222,8 @@
 
         return content()
 
-    def register(self):
-        resume = self.resume
-        self.score = - sum(resume.itervalues())
-        workers_by = self.server.workers
-
-        def _register(workers, score):
-            item = score, self
-            index = bisect.bisect_left(workers, item)
-            workers.insert(index, item)
-
-        _register(workers_by[None], self.score)
-
-        for site, score in resume.iteritems():
-            workers = workers_by.get(site)
-            if workers is None:
-                workers = workers_by[site] = []
-            _register(workers, score)
-
-        self.server.workers_event.set()
-
-    def unregister(self):
-        workers_by = self.server.workers
-
-        def _unregister(workers, score):
-            index = bisect.bisect_left(workers, (score, self))
-            del workers[index]
-
-        _unregister(workers_by[None], self.score)
-        for site, score in self.resume.iteritems():
-            _unregister(workers_by[site], score)
-
     def disconnected(self):
-        self.unregister()
+        self.pool.remove(self)
         zc.resumelb.util.Worker.disconnected(self)
 
 def parse_addr(addr):
@@ -175,9 +238,9 @@
         args = sys.argv[1:]
 
     logging.basicConfig(level=logging.INFO)
-    laddr, waddr = args
-    Server(parse_addr(laddr), parse_addr(waddr), host_classifier)
+    wsgi_addr, lb_addr = map(parse_addr, args)
 
-    gevent.hub.get_hub().switch()
+    lb = LB(lb_addr, host_classifier)
+    gevent.pywsgi.WSGIServer(wsgi_addr, lb.handle_wsgi).serve_forever()
 
 

Added: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2011-11-06 16:49:38 UTC (rev 123293)
@@ -0,0 +1,204 @@
+Resume-based load balancer pool
+===============================
+
+The heart of the resume-based load balancer is the pool, which
+implements the load balancing algorithm.  The pool has a collection of
+workers organized according to their resumes.
+
+The load balancer works by accepting remote worker connections and
+adding local workers to the pool, and by accepting wsgi request,
+getting local workers from the pool and passing the wsgi requests to
+the local workers, which, in term, forwward the requests to the remote
+workers.
+
+We'll test the pool with stand-ins for the local workers.
+
+    >>> import zc.resumelb.lb
+    >>> pool = zc.resumelb.lb.Pool(max_backlog=5)
+
+We specified a maximum per-worker backlog for the pool.  We specified
+a fairly low max backlog to make it easier to see what happens when
+a worker gets too backed up.
+
+The get method is used to get a worker from the pool.  A request class
+and an optional timeout is passed. (The timeout is mainly useful for
+testing.)
+
+    >>> pool.get('foo', 0.0)
+
+We didn't get a worker (we timed out), because we haven't added one.
+
+    >>> class Worker:
+    ...     def __init__(self, name):
+    ...         self.name = name
+    ...     def __repr__(self):
+    ...         return self.name
+    ...     def __cmp__(self, other):
+    ...         return cmp(self.name, other.name)
+    ...     def __hash__(self):
+    ...         return hash(self.name)
+
+    >>> w1 = Worker('w1')
+    >>> pool.new_resume(w1, {})
+
+As far as the pool is concerned, any object that can be in a set or be
+used as a dictionary key can be used as a worker.  The pool doesn't
+care.  We could have used ``object`` as out worker class, but we
+constructed a worker class that makes testing output more useful.
+
+    >>> pool.get('foo', 0.0)
+    w1
+
+ This time, we got the one we registered.
+
+ If we create another and register it, we'll still get the original:
+
+    >>> w2 = Worker('w2')
+    >>> pool.new_resume(w2, {})
+
+    >>> pool.get('foo')
+    w1
+
+ This is because w1 is known to be good at handling foo requests.
+
+ We'll get w2 if we pick a different request class:
+
+    >>> pool.get('bar')
+    w2
+
+ We're gonna be white box and look at the pool data structures from
+ time to time.
+
+    >>> pool
+    Request classes:
+      bar: w2(1.0,1)
+      foo: w1(1.0,2)
+    Backlogs:
+      1: [w2]
+      2: [w1]
+
+Here, we can see that w1 is used for the foo class and w2 for the bar
+class.  Let's add another worker:
+
+    >>> w3 = Worker('w3')
+    >>> pool.new_resume(w3, {})
+
+and make some more foo requests:
+
+    >>> [pool.get('foo') for i in range(3)]
+    [w1, w1, w1]
+
+    >>> pool
+    Request classes:
+      bar: w2(1.0,1)
+      foo: w1(1.0,5)
+    Backlogs:
+      0: [w3]
+      1: [w2]
+      5: [w1]
+
+Even though we still had a worker with no backlog, we kept sending
+requests to w1.  But but now, w1 has reached it's maximum backlog.  If
+we make another foo request, we'll start using w3, and when that's
+reached it's maximum backlog, we'll start using w2:
+
+    >>> [pool.get('foo') for i in range(7)]
+    [w3, w3, w3, w3, w3, w2, w2]
+
+    >>> pool
+    Request classes:
+      bar: w2(1.0,3)
+      foo: w1(1.0,5), w2(1.0,3), w3(1.0,5)
+    Backlogs:
+      3: [w2]
+      5: [w1, w3]
+
+If we get all workers to the maximum backlog, we'll block until a
+worker is free.
+
+    >>> [pool.get('foo') for i in range(2)]
+    [w2, w2]
+
+    >>> pool.get('foo', 0.0)
+
+When a worker is done doing it's work, we put it back in the pool:
+
+    >>> pool.put(w1)
+    >>> pool.put(w1)
+    >>> pool.put(w1)
+    >>> pool.put(w2)
+    >>> pool.put(w3)
+    >>> pool.put(w3)
+    >>> pool
+    Request classes:
+      bar: w2(1.0,4)
+      foo: w1(1.0,2), w2(1.0,4), w3(1.0,3)
+    Backlogs:
+      2: [w1]
+      3: [w3]
+      4: [w2]
+
+Now, when we get a worker, we'll get w3.
+
+    >>> pool.get('foo', 0.0)
+    w3
+
+Why? We adjust each score by the worker's backlog and search workers
+from high score to low until the adjusted score increases. Because w2
+has a higher backlog than w3, it's adjusted score is lower so we stop
+looking.  This is for 2 reasons:
+
+1. We want to bias selection towards a smaller number of workers,
+   ideally those with the best scores,
+
+2. We want to reduce the amount of work done in each get call.
+
+Now that we've done some work, let's updaye the resumes.  This will
+normally be done by workers after periodically collecting performance
+data.
+
+    >>> pool.new_resume(w1, {'foo': 3.0})
+
+    >>> pool.new_resume(w2, {'bar': 1.0, 'foo': 1.0})
+
+    >>> pool.new_resume(w3, {'foo': 2.0})
+
+    pool
+    Request classes:
+      bar: w2(1.0,4)
+      foo: w2(1.0,4), w3(2.0,4), w1(3.0,2)
+    Backlogs:
+      2: [w1]
+      4: [w2, w3]
+
+    >>> [pool.get('foo') for i in range(5)]
+    [w1, w1, w1, w3, w2]
+
+    >>> pool.put(w1)
+    >>> pool.put(w3)
+    >>> pool.put(w3)
+    >>> pool.put(w3)
+    >>> pool.put(w3)
+    >>> pool.put(w3)
+    >>> pool
+    Request classes:
+      bar: w2(1.0,5)
+      foo: w2(1.0,5), w3(2.0,0), w1(3.0,4)
+    Backlogs:
+      0: [w3]
+      4: [w1]
+      5: [w2]
+
+    >>> [pool.get('foo') for i in range(5)]
+    [w3, w3, w3, w1, w3]
+
+When a worker disconnect, it's removed from the pool:
+
+    >>> pool.remove(w1)
+    >>> pool
+    Request classes:
+      bar: w2(1.0,5)
+      foo: w2(1.0,5), w3(2.0,4)
+    Backlogs:
+      4: [w3]
+      5: [w2]


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2011-11-06 16:49:36 UTC (rev 123292)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2011-11-06 16:49:38 UTC (rev 123293)
@@ -53,7 +53,9 @@
     return unittest.TestSuite((
         manuel.testing.TestSuite(
             manuel.doctest.Manuel() + manuel.capture.Manuel(),
-            'worker.test',
+            *(sorted(name for name in os.listdir(os.path.dirname(__file__))
+                     if name.endswith('.test')
+                     )),
             setUp=setUp),
         ))
 



More information about the checkins mailing list