[Checkins] SVN: Sandbox/J1m/resumelb/s New simpler lb algorithm. More rweeking in future, but enough for now.

Jim Fulton jim at zope.com
Sun Jan 15 18:41:50 UTC 2012


Log message for revision 124054:
  New simpler lb algorithm.  More rweeking in future, but enough for now.
  

Changed:
  U   Sandbox/J1m/resumelb/setup.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/pool.test

-=-
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py	2012-01-15 18:41:47 UTC (rev 124053)
+++ Sandbox/J1m/resumelb/setup.py	2012-01-15 18:41:49 UTC (rev 124054)
@@ -15,7 +15,7 @@
 
 install_requires = [
     'setuptools', 'gevent', 'WebOb', 'zc.thread', 'zc.parse_addr',
-    'zc.mappingobject']
+    'zc.mappingobject', 'llist']
 extras_require = dict(
     test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-01-15 18:41:47 UTC (rev 124053)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-01-15 18:41:49 UTC (rev 124054)
@@ -3,6 +3,7 @@
 import gevent.hub
 import gevent.pywsgi
 import gevent.server
+import llist
 import logging
 import sys
 import webob
@@ -66,15 +67,11 @@
 
     def __init__(self, settings=None):
         if settings is None:
-            settings = dict(
-                max_backlog = 40,
-                unskilled_score = 1.0,
-                )
+            settings = {}
         self.settings = settings
         self.workers = set()
-        self.unskilled = [] # sorted([(uscore, poolworker)])
+        self.unskilled = llist.dllist()
         self.skilled = {}   # rclass -> {(score, workers)}
-        self.nskills = 0    # sum of resume lengths
         self.event = gevent.event.Event()
 
     def __repr__(self):
@@ -101,51 +98,39 @@
     def new_resume(self, worker, resume=None):
         skilled = self.skilled
         unskilled = self.unskilled
-        if worker in self.workers:
-            if worker.backlog < self.settings['max_backlog']:
-                del unskilled[bisect_left(unskilled, (worker.uscore, worker))]
+        workers = self.workers
+
+        target_skills_per_worker = 1 + (
+            self.settings.get('redundancy', 1) * len(skilled) /
+            (len(workers) or 1))
+
+        if worker in workers:
             for rclass, score in worker.resume.iteritems():
                 skilled[rclass].remove((score, worker))
-            self.nskills -= len(worker.resume)
+            if resume is None:
+                workers.remove(worker)
+                if worker.lnode is not None:
+                    unskilled.remove(worker.lnode)
+                    worker.lnode = None
+                return
         else:
-            self.workers.add(worker)
             worker.backlog = 0
+            workers.add(worker)
+            worker.lnode = unskilled.appendleft(worker)
 
-        if resume is None:
-            self.workers.remove(worker)
-        else:
-            worker.resume = resume
-            self.nskills += len(resume)
-            if resume:
-                scores = sorted(resume.values())
-                worker.unskilled_score = max(
-                    self.settings['unskilled_score'],
-                    scores[
-                        min(
-                            max(3, len(scores)/4),
-                            len(scores)-1,
-                            )
-                        ] / 10.0
-                    )
-            else:
-                worker.unskilled_score = (
-                    self.settings['unskilled_score'] * (1.0 + self.nskills) /
-                    len(self.workers))
+        resumeitems = resume.items()
+        drop = (len(resume) - target_skills_per_worker) / 2
+        if drop > 0:
+            resumeitems = sorted(resumeitems, key=lambda i: i[1])[drop:]
 
-            uscore = (
-                worker.unskilled_score /
-                (1.0 + worker.backlog)
-                )
-            worker.uscore = uscore
-            insort(unskilled, (uscore, worker))
-            for rclass, score in resume.iteritems():
-                try:
-                    skilled[rclass].add((score, worker))
-                except KeyError:
-                    skilled[rclass] = set(((score, worker), ))
+        worker.resume = dict(resumeitems)
+        for rclass, score in resumeitems:
+            try:
+                skilled[rclass].add((score, worker))
+            except KeyError:
+                skilled[rclass] = set(((score, worker), ))
 
-
-        if self.unskilled:
+        if unskilled:
             self.event.set()
 
     def remove(self, worker):
@@ -154,76 +139,91 @@
     def get(self, rclass, timeout=None):
         """Get a worker to handle a request class
         """
-
         unskilled = self.unskilled
         if not unskilled:
             self.event.wait(timeout)
-            if not self.unskilled:
+            if not unskilled:
                 return None
 
         # Look for a skilled worker
-        best_score, unskilled_worker = unskilled[-1]
-        best_worker = best_backlog = None
-        max_backlog = self.settings['max_backlog']
-        skilled = self.skilled.get(rclass, ())
+        max_backlog = self.settings.get('max_backlog', 40)
+        min_score = self.settings.get('min_score', 1.0)
+        best_score = 0
+        best_worker = None
+        skilled = self.skilled.get(rclass)
+        if skilled is None:
+            skilled = self.skilled[rclass] = set()
         for score, worker in skilled:
             backlog = worker.backlog + 1
-            if backlog > max_backlog:
-                continue
+            if backlog > 2:
+                if (
+                    # Don't let a worker get too backed up
+                    backlog > max_backlog or
+
+                    # We use min score as a way of allowing other workers
+                    # a chance to pick up work even if the skilled workers
+                    # haven't reached their backlog.  This is mainly a tuning
+                    # tool for when a worker is doing OK, but maybe still
+                    # doing too much.
+                    (score < min_score and
+                     unskilled and unskilled.first.value.backlog == 0
+                     )
+                    ):
+                    continue
             score /= backlog
-            if (score > best_score
-                or
-                (best_worker is None and worker is unskilled_worker)
-                ):
+            if (score > best_score):
                 best_score = score
                 best_worker = worker
-                best_backlog = backlog
 
-        if best_worker is not None:
-            uscore = best_worker.uscore
-            del unskilled[bisect_left(unskilled, (uscore, best_worker))]
-        else:
-            uscore, best_worker = unskilled.pop()
-            best_backlog = best_worker.backlog + 1
-            self.nskills += 1
-            resume = best_worker.resume
-            score = max(uscore, self.settings['unskilled_score'] * 10)
-            best_worker.resume[rclass] = score
-            if skilled == ():
-                self.skilled[rclass] = set(((score, best_worker),))
-            else:
+        if not best_score:
+            while unskilled.first.value.backlog >= max_backlog:
+                # Edge case.  max_backlog was reduced after a worker
+                # with a larger backlog was added.
+                #import pdb; pdb.set_trace()
+                unskilled.first.value.lnode = None
+                unskilled.popleft()
+                if not unskilled:
+                    # OK, now we need to wait. Just start over.
+                    return self.get(rclass, timeout)
+
+            best_worker = unskilled.first.value
+            if rclass not in best_worker.resume:
+
+                # We now have an unskilled worker and we need to
+                # assign it a score.
+                # - It has to be >= min score, or it won't get future work.
+                # - We want to give it work somewhat gradually.
+                # - We got here because:
+                #   - there are no skilled workers,
+                #   - The skilled workers have all either:
+                #     - Eached their max backlog, or
+                #     - Have scores > min score
+                # Let's set it to min score because either:
+                # - There are no skilled workers, so they'll all get the same
+                # - Other workers are maxed out, or
+                # - The score will be higher than some the existing, so it'll
+                #   get work
+                # We also allow for an unskilled_score setting to override.
+                score = self.settings.get('unskilled_score', min_score)
+                best_worker.resume[rclass] = score
                 skilled.add((score, best_worker))
-            lresume = len(resume)
-            uscore *= lresume/(lresume + 1.0)
 
-        uscore *= best_backlog / (1.0 + best_backlog)
-        best_worker.uscore = uscore
-        best_worker.backlog = best_backlog
-        if best_backlog < max_backlog:
-            insort(unskilled, (uscore, best_worker))
+        unskilled.remove(best_worker.lnode)
+        best_worker.backlog += 1
+        if best_worker.backlog < max_backlog:
+            best_worker.lnode = unskilled.append(best_worker)
+        else:
+            best_worker.lnode = None
+
         return best_worker
 
     def put(self, worker):
-        backlog = worker.backlog
-        if backlog < 1:
-            return
-        unskilled = self.unskilled
-        max_backlog = self.settings['max_backlog']
-        uscore = worker.uscore
-        if backlog < max_backlog:
-            del unskilled[bisect_left(unskilled, (uscore, worker))]
+        if worker.lnode is None:
+            worker.lnode = self.unskilled.append(worker)
+            self.event.set()
+        if worker.backlog:
+            worker.backlog -= 1
 
-        uscore *= (backlog + 1.0) / backlog
-        worker.uscore = uscore
-
-        backlog -= 1
-        worker.backlog = backlog
-
-        if backlog < max_backlog:
-            insort(unskilled, (uscore, worker))
-
-        self.event.set()
-
 class Worker(zc.resumelb.util.Worker):
 
     maxrno = (1<<32) - 1

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-01-15 18:41:47 UTC (rev 124053)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-01-15 18:41:49 UTC (rev 124054)
@@ -1,3 +1,4 @@
+===============================
 Resume-based load balancer pool
 ===============================
 
@@ -11,15 +12,14 @@
 the local workers, which, in term, forwward the requests to the remote
 workers.
 
-We'll test the pool with stand-ins for the local workers.
+We'll test the pool with stand-ins for the local workers.  The pool
+constructor takes a settings mapping object.  This allows the settings
+to be managed in real time.
 
     >>> import zc.resumelb.lb
-    >>> pool = zc.resumelb.lb.Pool(dict(max_backlog=5, unskilled_score=1.0))
+    >>> settings = {}
+    >>> pool = zc.resumelb.lb.Pool(settings)
 
-We specified a maximum per-worker backlog for the pool.  We specified
-a fairly low max backlog to make it easier to see what happens when
-a worker gets too backed up.
-
 The get method is used to get a worker from the pool.  A request class
 and an optional timeout is passed. (The timeout is mainly useful for
 testing.)
@@ -42,25 +42,25 @@
     ...     Disconnected = None
 
     >>> w1 = Worker('w1')
+
     >>> pool.new_resume(w1, {})
 
 As far as the pool is concerned, any object that can be in a set or be
 used as a dictionary key can be used as a worker.  The pool doesn't
-care.  We could have used ``object`` as out worker class, but we
-constructed a worker class that makes testing output more useful.
+care.  The pool does add some extra attrobutes to workers.
 
-    >>> pool.get('foo', 0.0)
-    w1
+   >>> pool.get('foo', 0.0)
+   w1
 
- This time, we got the one we registered.
+This time, we got the one we registered.
 
- If we create another and register it, we'll still get the original:
+If we create another and register it, we'll still get the original:
 
-    >>> w2 = Worker('w2')
-    >>> pool.new_resume(w2, {})
+   >>> w2 = Worker('w2')
+   >>> pool.new_resume(w2, {})
 
-    >>> pool.get('foo')
-    w1
+   >>> pool.get('foo')
+   w1
 
  This is because w1 is known to be good at handling foo requests.
 
@@ -74,15 +74,20 @@
 
     >>> pool
     Request classes:
-      bar: w2(10.0,1)
-      foo: w1(10.0,2)
+      bar: w2(1.0,1)
+      foo: w1(1.0,2)
     Backlogs:
       1: [w2]
       2: [w1]
 
 Here, we can see that w1 is used for the foo class and w2 for the bar
-class.  Let's add another worker:
+class.  In the request classes, the worker's score and it's overall
+backlog if shown in paretheses.  We see that both workers have a score
+of 1.0.  This is the default score for new workers.  We'll say more
+about this later.
 
+Let's add another worker:
+
     >>> w3 = Worker('w3')
     >>> pool.new_resume(w3, {})
 
@@ -93,15 +98,21 @@
 
     >>> pool
     Request classes:
-      bar: w2(10.0,1)
-      foo: w1(10.0,5)
+      bar: w2(1.0,1)
+      foo: w1(1.0,5)
     Backlogs:
       0: [w3]
       1: [w2]
       5: [w1]
 
 Even though we still had a worker with no backlog, we kept sending
-requests to w1.  But but now, w1 has reached it's maximum backlog.  If
+requests to w1.  This is because w1 hasn't reached it's maximum
+backlog.  Also, it's score is greater than the min score, which
+defaults to 1.0.  Let's reduce the maximum backlog to 5:
+
+    >>> settings['max_backlog'] = 5
+
+So now, w1 has reached it's maximum backlog.  If
 we make another foo request, we'll start using w3, and when that's
 reached it's maximum backlog, we'll start using w2:
 
@@ -110,8 +121,8 @@
 
     >>> pool
     Request classes:
-      bar: w2(10.0,3)
-      foo: w1(10.0,5), w2(10.0,3), w3(10.0,5)
+      bar: w2(1.0,3)
+      foo: w1(1.0,5), w2(1.0,3), w3(1.0,5)
     Backlogs:
       3: [w2]
       5: [w1, w3]
@@ -134,8 +145,8 @@
     >>> pool.put(w3)
     >>> pool
     Request classes:
-      bar: w2(10.0,4)
-      foo: w1(10.0,2), w2(10.0,4), w3(10.0,3)
+      bar: w2(1.0,4)
+      foo: w1(1.0,2), w2(1.0,4), w3(1.0,3)
     Backlogs:
       2: [w1]
       3: [w3]
@@ -146,22 +157,24 @@
     >>> pool.get('foo', 0.0)
     w1
 
-Why? We adjust each score by the worker's backlog.
+Why? We adjust each score by the worker's backlog, so even though all
+2 workers had the same score, w1 is chosen because it has the smallest
+backlog.
 
-Now that we've done some work, let's updaye the resumes.  This will
-normally be done by workers after periodically collecting performance
+Now that we've done some work, let's update the resumes.  This will
+normally be done by workers periodically, after collecting performance
 data.
 
-    >>> pool.new_resume(w1, {'foo': 30.0})
+    >>> pool.new_resume(w1, {'foo': 6.0})
 
-    ;>>> pool.new_resume(w2, {'bar': 10.0, 'foo': 10.0})
+    >>> pool.new_resume(w2, {'bar': 2.0, 'foo': 2.0})
 
-    >>> pool.new_resume(w3, {'foo': 19.0})
+    >>> pool.new_resume(w3, {'foo': 3.8})
 
     >>> pool
     Request classes:
-      bar: w2(10.0,4)
-      foo: w2(10.0,4), w3(19.0,3), w1(30.0,3)
+      bar: w2(2.0,4)
+      foo: w2(2.0,4), w3(3.8,3), w1(6.0,3)
     Backlogs:
       3: [w1, w3]
       4: [w2]
@@ -173,8 +186,8 @@
 
     >>> pool
     Request classes:
-      bar: w2(10.0,4)
-      foo: w2(10.0,4), w3(19.0,3), w1(30.0,5)
+      bar: w2(2.0,4)
+      foo: w2(2.0,4), w3(3.8,3), w1(6.0,5)
     Backlogs:
       3: [w3]
       4: [w2]
@@ -197,8 +210,8 @@
     >>> pool.put(w3)
     >>> pool
     Request classes:
-      bar: w2(10.0,5)
-      foo: w2(10.0,5), w3(19.0,0), w1(30.0,4)
+      bar: w2(2.0,5)
+      foo: w2(2.0,5), w3(3.8,0), w1(6.0,4)
     Backlogs:
       0: [w3]
       4: [w1]
@@ -207,13 +220,164 @@
     >>> [pool.get('foo') for i in range(5)]
     [w3, w3, w3, w1, w3]
 
+Pool settings
+=============
+
+There are several settings that effect pools:
+
+redundancy
+  Target number of workers for each request class, defaulting to 1.
+
+max_backlog
+  Maximum worker backlog, defaulting to 40.
+
+min_score
+  A worker won't be used if it has a backlog greater than 1 and it's
+  score is less than min_score.
+
+unskilled_score
+  The score assigned to workers when given a new request class.  This
+  defaults to min_score.
+
+We've already seen max_backlog at work.  Let's test the other
+settings.
+
+redundancy
+----------
+
+Given a redundancy, we can compute an expected number of request
+classes per worker (resumne size), which is the number of request
+classes divided by the number of workers times the redundancy.  With
+special handling for no workers or request classes, this works out
+to::
+
+  1 + redundancy * n_request_classes / max(nworkers, 1)
+
+When we get a new resume and it is larger than the expected size, we
+discard half of the excess number of items with the lowest score.
+Given the pool data:
+
+    >>> pool
+    Request classes:
+      bar: w2(2.0,5)
+      foo: w2(2.0,5), w3(3.8,4), w1(6.0,5)
+    Backlogs:
+      4: [w3]
+      5: [w1, w2]
+
+We see there are 2 request classes and 3 workers, so we expect one
+request class per worker.
+
+Let's add a new worker with a much larger resume:
+
+    >>> w4 = Worker('w4')
+    >>> pool.new_resume(w4, dict((str(i), float(i)) for i in range(9)))
+
+When we look at the pool, we see that 4 of the items were discarded:
+
+    >>> pool
+    Request classes:
+      4: w4(4.0,0)
+      5: w4(5.0,0)
+      6: w4(6.0,0)
+      7: w4(7.0,0)
+      8: w4(8.0,0)
+      bar: w2(2.0,5)
+      foo: w2(2.0,5), w3(3.8,4), w1(6.0,5)
+    Backlogs:
+      0: [w4]
+      4: [w3]
+      5: [w1, w2]
+
+Now we have 7 request classes and 4 workers.  If we set redundancy to
+3, then the expected resume size is 6, so::
+
+    >>> settings['redundancy'] = 3
+    >>> pool.new_resume(w4, dict((str(i), float(i)) for i in range(9)))
+
+    >>> pool
+    Request classes:
+      1: w4(1.0,0)
+      2: w4(2.0,0)
+      3: w4(3.0,0)
+      4: w4(4.0,0)
+      5: w4(5.0,0)
+      6: w4(6.0,0)
+      7: w4(7.0,0)
+      8: w4(8.0,0)
+      bar: w2(2.0,5)
+      foo: w2(2.0,5), w3(3.8,4), w1(6.0,5)
+    Backlogs:
+      0: [w4]
+      4: [w3]
+      5: [w1, w2]
+
+min_score
+---------
+
+min_score is mainly provided as a tool to balance work accross
+skills. The algorithm favors giving work to skilled workers.
+If one worker handles a large number of request classes, relative to
+other workers, it might perform sub-optimally, but if load is too low
+to force it to it's maximum backlog, it won't transfer work to other
+workers. min_score provides a tool to help with this.  If a worker has
+a low score and only a modest backlog, it won't be used.
+
+To see this, let's reduce w3's backlog:
+
+    >>> pool.put(w3); pool.put(w3)
+
+but set the min_score to 4:
+
+    >>> settings['min_score'] = 4.0
+
+And the get a worker for foo:
+
+    >>> pool.get('foo')
+    w4
+    >>> pool
+    Request classes:
+      1: w4(1.0,1)
+      2: w4(2.0,1)
+      3: w4(3.0,1)
+      4: w4(4.0,1)
+      5: w4(5.0,1)
+      6: w4(6.0,1)
+      7: w4(7.0,1)
+      8: w4(8.0,1)
+      bar: w2(2.0,5)
+      foo: w2(2.0,5), w3(3.8,2), w4(4.0,1), w1(6.0,5)
+    Backlogs:
+      1: [w4]
+      2: [w3]
+      5: [w1, w2]
+
+We get the unskilled w4 because w1 and w2 are at their maximum
+backlogs, and w3 has a backloh of 2 and a score of only 3.8.
+
+Note that w4 is assigned a skill for foo of 4, which is min_score.
+
+XXX It's unclear if min_score provides much, if any, benefit.
+
+Worker disconnect
+=================
+
 When a worker disconnect, it's removed from the pool:
 
     >>> pool.remove(w1)
     >>> pool.remove(w3)
     >>> pool
     Request classes:
-      bar: w2(10.0,5)
-      foo: w2(10.0,5)
+      1: w4(1.0,1)
+      2: w4(2.0,1)
+      3: w4(3.0,1)
+      4: w4(4.0,1)
+      5: w4(5.0,1)
+      6: w4(6.0,1)
+      7: w4(7.0,1)
+      8: w4(8.0,1)
+      bar: w2(2.0,5)
+      foo: w2(2.0,5), w4(4.0,1)
     Backlogs:
+      1: [w4]
       5: [w2]



More information about the checkins mailing list