[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/ Replaced the lb algorithm with one that computes a maximum backlog

Jim Fulton jim at zope.com
Sun Mar 4 20:21:22 UTC 2012

Log message for revision 124506:
  Replaced the lb algorithm with one that computes a maximum backlog
  based on variance from the pool mean backlog, with backlogs and mean
  backlogs being smoothed using a moving average based on decayed sums
  and counts.

  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/zk.py

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-03-04 20:21:20 UTC (rev 124506)
@@ -87,20 +87,35 @@
 class Pool:
-    def __init__(self, max_backlog=40, unskilled_score=1.0):
-        self.max_backlog = max_backlog
+    def __init__(self, unskilled_score=1.0, variance=4.0, backlog_history=9):
         self.unskilled_score = unskilled_score
+        self.variance = variance
+        self.backlog_history = backlog_history
+        self._update_worker_decay()
         self.workers = set()
+        self.nworkers = 0
         self.unskilled = llist.dllist()
         self.skilled = {}   # rclass -> {[(score, workers)]}
         self.event = gevent.event.Event()
+        _init_backlog(self)
     def update_settings(self, settings):
-        for name in ('max_backlog', 'unskilled_score'):
+        for name in ('unskilled_score', 'variance', 'backlog_history'):
             if name in settings:
                 setattr(self, name, settings[name])
+        if 'backlog_history' in settings:
+            self._update_worker_decay()
+            self._update_decay()
+    def _update_decay(self):
+        if self.nworkers:
+            self.decay = 1.0 - 1.0/(self.backlog_history*2*self.nworkers)
+    def _update_worker_decay(self):
+        self.worker_decay = 1.0 - 1.0/(self.backlog_history*2)
     def __repr__(self):
         outl = []
         out = outl.append
@@ -110,11 +125,15 @@
                 % (rclass,
                    ', '.join(
                        '%s(%s,%s)' %
-                       (worker, score, worker.backlog)
+                       (worker, score, worker.mbacklog)
                        for (score, worker) in sorted(skilled)
+        out("  overall backlog: %s Decayed: %s Avg: %s" % (
+            self.backlog, self.mbacklog,
+            (self.mbacklog / self.nworkers) if self.nworkers else None
+            ))
         backlogs = {}
         for worker in self.workers:
             backlogs.setdefault(worker.backlog, []).append(worker)
@@ -122,24 +141,20 @@
             out('  %s: %r' % (backlog, sorted(workers)))
         return '\n'.join(outl)
-    def new_resume(self, worker, resume=None):
+    def new_resume(self, worker, resume):
         skilled = self.skilled
-        unskilled = self.unskilled
         workers = self.workers
         if worker in workers:
             for rclass, score in worker.resume.iteritems():
                 skilled[rclass].remove((score, worker))
-            if resume is None:
-                workers.remove(worker)
-                if worker.lnode is not None:
-                    unskilled.remove(worker.lnode)
-                    worker.lnode = None
-                return
-            worker.backlog = 0
+            _init_backlog(worker)
-            worker.lnode = unskilled.appendleft(worker)
+            self.nworkers = len(self.workers)
+            self._update_decay()
+            worker.lnode = self.unskilled.appendleft(worker)
+            self.event.set()
         worker.resume = resume
         for rclass, score in resume.iteritems():
@@ -148,11 +163,21 @@
             except KeyError:
                 skilled[rclass] = set(((score, worker), ))
-        if unskilled:
-            self.event.set()
+        logger.info('new resume\n%s', self)
     def remove(self, worker):
-        self.new_resume(worker)
+        skilled = self.skilled
+        for rclass, score in worker.resume.iteritems():
+            skilled[rclass].remove((score, worker))
+        if getattr(worker, 'lnode', None) is not None:
+            self.unskilled.remove(worker.lnode)
+            worker.lnode = None
+        self.workers.remove(worker)
+        self.nworkers = len(self.workers)
+        if self.nworkers:
+            self._update_decay()
+        else:
+            self.event.clear()
     def get(self, rclass, timeout=None):
         """Get a worker to handle a request class
@@ -164,61 +189,59 @@
                 return None
         # Look for a skilled worker
-        max_backlog = self.max_backlog
         best_score = 0
         best_worker = None
         skilled = self.skilled.get(rclass)
         if skilled is None:
             skilled = self.skilled[rclass] = set()
+        max_backlog = self.variance * max(self.mbacklog / self.nworkers, 2)
         for score, worker in skilled:
+            if worker.mbacklog > max_backlog:
+                continue
             backlog = worker.backlog + 1
-            if backlog > max_backlog:
-                continue
             score /= backlog
             if (score > best_score):
                 best_score = score
                 best_worker = worker
         if not best_score:
-            while unskilled.first.value.backlog >= max_backlog:
-                # Edge case.  max_backlog was reduced after a worker
-                # with a larger backlog was added.
-                #import pdb; pdb.set_trace()
-                unskilled.first.value.lnode = None
-                unskilled.popleft()
-                if not unskilled:
-                    # OK, now we need to wait. Just start over.
-                    return self.get(rclass, timeout)
             best_worker = unskilled.first.value
             if rclass not in best_worker.resume:
                 # We now have an unskilled worker and we need to
                 # assign it a score.
-                # - We want to give it work somewhat gradually.
-                # - We got here because:
-                #   - there are no skilled workers,
-                #   - The skilled workers have all reached their max backlog
                 score = self.unskilled_score
                 best_worker.resume[rclass] = score
                 skilled.add((score, best_worker))
+        # Move worker from lru to mru end of queue
+        best_worker.lnode = unskilled.append(best_worker)
         best_worker.backlog += 1
-        if best_worker.backlog < max_backlog:
-            best_worker.lnode = unskilled.append(best_worker)
-        else:
-            best_worker.lnode = None
+        _decay_backlog(best_worker, self.worker_decay)
+        self.backlog += 1
+        _decay_backlog(self, self.decay)
         return best_worker
     def put(self, worker):
-        if worker.lnode is None:
-            worker.lnode = self.unskilled.append(worker)
-            self.event.set()
-        if worker.backlog:
+        self.backlog -= 1
+        assert self.backlog >= 0, self.backlog
+        _decay_backlog(self, self.decay)
+        if worker.backlog > 0:
             worker.backlog -= 1
+            _decay_backlog(worker, self.worker_decay)
+def _init_backlog(worker):
+    worker.backlog = worker.nbacklog = worker.dbacklog = worker.mbacklog = 0
+def _decay_backlog(worker, decay):
+    worker.dbacklog = worker.dbacklog*decay + worker.backlog
+    worker.nbacklog = worker.nbacklog*decay + 1
+    worker.mbacklog = worker.dbacklog / worker.nbacklog
 class Worker(zc.resumelb.util.Worker):
     maxrno = (1<<32) - 1

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-03-04 20:21:20 UTC (rev 124506)
@@ -211,12 +211,13 @@
 Workers use their addresses as their reprs.
-    >>> print lb.pool
+    >>> print lb.pool # doctest: +ELLIPSIS
     Request classes:
-      h1.com:,0)
-      h2.com:,0)
+      h1.com:,0.970...)
+      h2.com:,0.485...)
-      0: [,]
+      overall backlog: 0 Decayed: 1.47809138734 Avg: 0.73904569367
+      0: [,]
 Worker disconnection

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-03-04 20:21:20 UTC (rev 124506)
@@ -15,8 +15,27 @@
 We'll test the pool with stand-ins for the local workers.
     >>> import zc.resumelb.lb
-    >>> pool = zc.resumelb.lb.Pool()
+    >>> pool = zc.resumelb.lb.Pool(
+    ...     variance=2, backlog_history=2, unskilled_score=.5)
+We specified a number of optional paramters that we'll see in action
+   How many times the pool mean backlog we'll ket a worker's backlog
+   rise before we look for another worker. The default is 4.
+   The (aproximate) number of requests to include in the mean backlog
+   for a worker. The default is 9.
+   The pool also has a backlog history, which is the product of the
+   worker backlog_history and the number of workers.
+   The score to assign to unsklilled workers. The default is 1.
 The get method is used to get a worker from the pool.  A request class
 and an optional timeout is passed. (The timeout is mainly useful for
@@ -64,20 +83,24 @@
  We're gonna be white box and look at the pool data structures from
  time to time.
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(1.0,1)
-      foo: w1(1.0,2)
+      bar: w2(0.5,1.0)
+      foo: w1(0.5,1.57...)
+      overall backlog: 3 Decayed: 2.08... Avg: 1.04...
       1: [w2]
       2: [w1]
 Here, we can see that w1 is used for the foo class and w2 for the bar
-class.  In the request classes, the worker's score and it's overall
-backlog if shown in paretheses.  We see that both workers have a score
+class.  In the request classes, the worker's score and it's decayed
+backlog is shown in paretheses.  We see that both workers have a score
 of 1.0.  This is the default score for new workers.  We'll say more
 about this later.
+The decayed backlog, for the pool, and for workers, is an "average"
+backlog over a backlog history, which is a configation parameter.
 Let's add another worker:
     >>> w3 = Worker('w3')
@@ -85,114 +108,133 @@
 and make some more foo requests:
-    >>> [pool.get('foo') for i in range(3)]
-    [w1, w1, w1]
+    >>> [pool.get('foo') for i in range(4)]
+    [w1, w1, w1, w1]
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(1.0,1)
-      foo: w1(1.0,5)
+      bar: w2(0.5,1.0)
+      foo: w1(0.5,4.29907929908)
+      overall backlog: 7 Decayed: 4.39137888071 Avg: 1.46379296024
       0: [w3]
       1: [w2]
-      5: [w1]
+      6: [w1]
 Even though we still had a worker with no backlog, we kept sending
-requests to w1.  This is because w1 hasn't reached it's maximum
-backlog.  Also, it's score is greater than the min score, which
-defaults to 1.0.  Let's reduce the maximum backlog to 5:
+requests to w1.  This is because w1 hasn't reached the maximum
+backlog, which is:
-    >>> pool.max_backlog = 5
+   variance * max(2, pool_mean_backlog)
 So now, w1 has reached it's maximum backlog.  If
-we make another foo request, we'll start using w3, and when that's
-reached it's maximum backlog, we'll start using w2:
+we make another foo request, we'll start using w3:
-    >>> [pool.get('foo') for i in range(7)]
-    [w3, w3, w3, w3, w3, w2, w2]
+    >>> [pool.get('foo') for i in range(10)]
+    [w3, w3, w3, w3, w3, w3, w1, w3, w1, w3]
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(1.0,3)
-      foo: w1(1.0,5), w2(1.0,3), w3(1.0,5)
+      bar: w2(0.5,1.0)
+      foo: w1(0.5,5.89000423908), w3(0.5,5.89000423908)
-      3: [w2]
-      5: [w1, w3]
+      overall backlog: 17 Decayed: 11.0516220545 Avg: 3.68387401818
+      1: [w2]
+      8: [w1, w3]
-If we get all workers to the maximum backlog, we'll block until a
-worker is free.
+Something interesting heppened here.  After several requests, the
+pools switched back and forth between w1 and w3.  It never switched to
+w2.  This is because, as the backlogs for w2 and w2 increased, so did
+the mean backlog.  This is useful, because it prevents a single
+request class from taking over the entire pool.  Let's see what
+happens when we add some new workers:
+    >>> w4 = Worker('w4')
+    >>> pool.new_resume(w4, {})
+    >>> w5 = Worker('w5')
+    >>> pool.new_resume(w5, {})
+    >>> pool # doctest: +ELLIPSIS
+    Request classes:
+      bar: w2(0.5,1.0)
+      foo: w1(0.5,5.89000423908), w3(0.5,5.89000423908)
+    Backlogs:
+      overall backlog: 17 Decayed: 11.0516220545 Avg: 2.21032441091
+      0: [w4, w5]
+      1: [w2]
+      8: [w1, w3]
     >>> [pool.get('foo') for i in range(2)]
-    [w2, w2]
+    [w5, w5]
-    >>> pool.get('foo', 0.0)
+The new workers caused the overall mean to drop and when we got a foo
+request, it went to the newest new worker.  The subsequent request
+went to that worker because it was skilled and had a fairly low backlog.
 When a worker is done doing it's work, we put it back in the pool:
-    >>> pool.put(w1)
-    >>> pool.put(w1)
-    >>> pool.put(w1)
-    >>> pool.put(w2)
-    >>> pool.put(w3)
-    >>> pool.put(w3)
-    >>> pool
+    >>> for i in range(8):
+    ...     pool.put(w1)
+    >>> for i in range(8):
+    ...     pool.put(w3)
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(1.0,4)
-      foo: w1(1.0,2), w2(1.0,4), w3(1.0,3)
+      bar: w2(0.5,1.0)
+      foo: w1(0.5,2.45398560273), w3(0.5,2.45398560273), w5(0.5,1.57142857143)
-      2: [w1]
-      3: [w3]
-      4: [w2]
+      overall backlog: 3 Decayed: 10.2983868199 Avg: 2.05967736397
+      0: [w1, w3, w4]
+      1: [w2]
+      2: [w5]
 Now, when we get a worker, we'll get w1.
     >>> pool.get('foo', 0.0)
-Why? We adjust each score by the worker's backlog, so even though all
-2 workers had the same score, w1 is chosen because it has the smallest
 Now that we've done some work, let's update the resumes.  This will
 normally be done by workers periodically, after collecting performance
     >>> pool.new_resume(w1, {'foo': 6.0})
     >>> pool.new_resume(w2, {'bar': 2.0, 'foo': 2.0})
     >>> pool.new_resume(w3, {'foo': 3.8})
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(2.0,4)
-      foo: w2(2.0,4), w3(3.8,3), w1(6.0,3)
+      bar: w2(2.0,1.0)
+      foo: w5(0.5,1.57...), w2(2.0,1.0), w3(3.8,2.45...), w1(6.0,2.08...)
-      3: [w1, w3]
-      4: [w2]
+      overall backlog: 4 Decayed: 9.90317056202 Avg: 1.9806341124
+      0: [w3, w4]
+      1: [w1, w2]
+      2: [w5]
     >>> pool.get('foo')
+    w3
+    >>> pool.get('foo')
     >>> pool.get('foo')
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(2.0,4)
-      foo: w2(2.0,4), w3(3.8,3), w1(6.0,5)
+      bar: w2(2.0,1.0)
+      foo: w5(0.5,1.57...), w2(2.0,1.0), w3(3.8,2.08...), w1(6.0,2.30...)
-      3: [w3]
-      4: [w2]
-      5: [w1]
+      overall backlog: 7 Decayed: 9.23495653237 Avg: 1.84699130647
+      0: [w4]
+      1: [w2, w3]
+      2: [w5]
+      3: [w1]
-Because w1 has reached the maximum backlog, it's out of the running.
     >>> pool.get('foo')
     >>> pool.get('foo')
+    w1
+    >>> pool.get('foo')
-    >>> pool.get('foo')
-    w2
     >>> pool.put(w1)
     >>> pool.put(w3)
@@ -200,37 +242,20 @@
     >>> pool.put(w3)
     >>> pool.put(w3)
     >>> pool.put(w3)
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(2.0,5)
-      foo: w2(2.0,5), w3(3.8,0), w1(6.0,4)
+      bar: w2(2.0,1.0)
+      foo: w5(0.5,1.57...), w2(2.0,1.0), w3(3.8,1.43...), w1(6.0,2.79...)
-      0: [w3]
-      4: [w1]
-      5: [w2]
+      overall backlog: 4 Decayed: 8.35... Avg: 1.67...
+      0: [w3, w4]
+      1: [w2]
+      2: [w5]
+      3: [w1]
     >>> [pool.get('foo') for i in range(5)]
-    [w3, w3, w3, w1, w3]
+    [w3, w3, w1, w3, w1]
-Pool settings
-There are several settings that effect pools:
-  Maximum worker backlog, defaulting to 40.
-  A worker won't be used if it has a backlog greater than 1 and it's
-  score is less than min_score.
-  The score assigned to workers when given a new request class.  This
-  defaults to min_score.
-We've already seen max_backlog at work.  Let's test the other
 Worker disconnect
@@ -238,9 +263,37 @@
     >>> pool.remove(w1)
     >>> pool.remove(w3)
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(2.0,5)
-      foo: w2(2.0,5)
+      bar: w2(2.0,1.0)
+      foo: w5(0.5,1.57142857143), w2(2.0,1.0)
-      5: [w2]
+      overall backlog: 9 Decayed: 8.043843598 Avg: 2.68128119933
+      0: [w4]
+      1: [w2]
+      2: [w5]
+Updating worker settings
+    >>> pool = zc.resumelb.lb.Pool()
+    >>> pool.new_resume(w1, {'foo': 6.0})
+    >>> pool.new_resume(w2, {'bar': 2.0, 'foo': 2.0})
+    >>> pool.new_resume(w3, {'foo': 3.8})
+    >>> pool.variance, pool.backlog_history, pool.unskilled_score
+    (4.0, 9, 1.0)
+    >>> pool.worker_decay, pool.decay # doctest: +ELLIPSIS
+    (0.944..., 0.981...)
+    >>> pool.update_settings(dict(variance=2.0))
+    >>> pool.variance, pool.backlog_history, pool.unskilled_score
+    (2.0, 9, 1.0)
+    >>> pool.worker_decay, pool.decay # doctest: +ELLIPSIS
+    (0.944..., 0.981...)
+    >>> pool.update_settings(
+    ... dict(variance=3.0, backlog_history=6, unskilled_score=.25))
+    >>> pool.variance, pool.backlog_history, pool.unskilled_score
+    (3.0, 6, 0.25)
+    >>> pool.worker_decay, pool.decay # doctest: +ELLIPSIS
+    (0.916..., 0.972...)

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.py	2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.py	2012-03-04 20:21:20 UTC (rev 124506)
@@ -25,6 +25,7 @@
            threads=None, backdoor=False, run=True, **kw):
     """Paste deploy server runner
+    # XXX support log level
     if loggers:
         import ZConfig
@@ -123,6 +124,7 @@
+    # XXX default to basic config?
     if options.logger_configuration:
         import ZConfig
         with open(options.logger_configuration) as f:

More information about the checkins mailing list