[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/ Replaced the lb algorithm with one that computes a maximum backlog

Jim Fulton jim at zope.com
Sun Mar 4 20:21:22 UTC 2012


Log message for revision 124506:
  Replaced the lb algorithm with one that computes a maximum backlog
  based on variance from the pool mean backlog, with backlogs and mean
  backlogs being smoothed using a moving average based on decayed sums
  and counts.
  

Changed:
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/zk.py

-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-03-04 20:21:20 UTC (rev 124506)
@@ -87,20 +87,35 @@
 
 class Pool:
 
-    def __init__(self, max_backlog=40, unskilled_score=1.0):
-        self.max_backlog = max_backlog
+    def __init__(self, unskilled_score=1.0, variance=4.0, backlog_history=9):
         self.unskilled_score = unskilled_score
+        self.variance = variance
+        self.backlog_history = backlog_history
+        self._update_worker_decay()
 
         self.workers = set()
+        self.nworkers = 0
         self.unskilled = llist.dllist()
         self.skilled = {}   # rclass -> {[(score, workers)]}
         self.event = gevent.event.Event()
+        _init_backlog(self)
 
     def update_settings(self, settings):
-        for name in ('max_backlog', 'unskilled_score'):
+        for name in ('unskilled_score', 'variance', 'backlog_history'):
             if name in settings:
                 setattr(self, name, settings[name])
 
+        if 'backlog_history' in settings:
+            self._update_worker_decay()
+            self._update_decay()
+
+    def _update_decay(self):
+        if self.nworkers:
+            self.decay = 1.0 - 1.0/(self.backlog_history*2*self.nworkers)
+
+    def _update_worker_decay(self):
+        self.worker_decay = 1.0 - 1.0/(self.backlog_history*2)
+
     def __repr__(self):
         outl = []
         out = outl.append
@@ -110,11 +125,15 @@
                 % (rclass,
                    ', '.join(
                        '%s(%s,%s)' %
-                       (worker, score, worker.backlog)
+                       (worker, score, worker.mbacklog)
                        for (score, worker) in sorted(skilled)
                    ))
                 )
         out('Backlogs:')
+        out("  overall backlog: %s Decayed: %s Avg: %s" % (
+            self.backlog, self.mbacklog,
+            (self.mbacklog / self.nworkers) if self.nworkers else None
+            ))
         backlogs = {}
         for worker in self.workers:
             backlogs.setdefault(worker.backlog, []).append(worker)
@@ -122,24 +141,20 @@
             out('  %s: %r' % (backlog, sorted(workers)))
         return '\n'.join(outl)
 
-    def new_resume(self, worker, resume=None):
+    def new_resume(self, worker, resume):
         skilled = self.skilled
-        unskilled = self.unskilled
         workers = self.workers
 
         if worker in workers:
             for rclass, score in worker.resume.iteritems():
                 skilled[rclass].remove((score, worker))
-            if resume is None:
-                workers.remove(worker)
-                if worker.lnode is not None:
-                    unskilled.remove(worker.lnode)
-                    worker.lnode = None
-                return
         else:
-            worker.backlog = 0
+            _init_backlog(worker)
             workers.add(worker)
-            worker.lnode = unskilled.appendleft(worker)
+            self.nworkers = len(self.workers)
+            self._update_decay()
+            worker.lnode = self.unskilled.appendleft(worker)
+            self.event.set()
 
         worker.resume = resume
         for rclass, score in resume.iteritems():
@@ -148,11 +163,21 @@
             except KeyError:
                 skilled[rclass] = set(((score, worker), ))
 
-        if unskilled:
-            self.event.set()
+        logger.info('new resume\n%s', self)
 
     def remove(self, worker):
-        self.new_resume(worker)
+        skilled = self.skilled
+        for rclass, score in worker.resume.iteritems():
+            skilled[rclass].remove((score, worker))
+        if getattr(worker, 'lnode', None) is not None:
+            self.unskilled.remove(worker.lnode)
+            worker.lnode = None
+        self.workers.remove(worker)
+        self.nworkers = len(self.workers)
+        if self.nworkers:
+            self._update_decay()
+        else:
+            self.event.clear()
 
     def get(self, rclass, timeout=None):
         """Get a worker to handle a request class
@@ -164,61 +189,59 @@
                 return None
 
         # Look for a skilled worker
-        max_backlog = self.max_backlog
         best_score = 0
         best_worker = None
         skilled = self.skilled.get(rclass)
         if skilled is None:
             skilled = self.skilled[rclass] = set()
+
+        max_backlog = self.variance * max(self.mbacklog / self.nworkers, 2)
         for score, worker in skilled:
+            if worker.mbacklog > max_backlog:
+                continue
             backlog = worker.backlog + 1
-            if backlog > max_backlog:
-                continue
             score /= backlog
             if (score > best_score):
                 best_score = score
                 best_worker = worker
 
         if not best_score:
-            while unskilled.first.value.backlog >= max_backlog:
-                # Edge case.  max_backlog was reduced after a worker
-                # with a larger backlog was added.
-                #import pdb; pdb.set_trace()
-                unskilled.first.value.lnode = None
-                unskilled.popleft()
-                if not unskilled:
-                    # OK, now we need to wait. Just start over.
-                    return self.get(rclass, timeout)
-
             best_worker = unskilled.first.value
             if rclass not in best_worker.resume:
-
                 # We now have an unskilled worker and we need to
                 # assign it a score.
-                # - We want to give it work somewhat gradually.
-                # - We got here because:
-                #   - there are no skilled workers,
-                #   - The skilled workers have all reached their max backlog
                 score = self.unskilled_score
                 best_worker.resume[rclass] = score
                 skilled.add((score, best_worker))
 
+        # Move worker from lru to mru end of queue
         unskilled.remove(best_worker.lnode)
+        best_worker.lnode = unskilled.append(best_worker)
+
         best_worker.backlog += 1
-        if best_worker.backlog < max_backlog:
-            best_worker.lnode = unskilled.append(best_worker)
-        else:
-            best_worker.lnode = None
+        _decay_backlog(best_worker, self.worker_decay)
 
+        self.backlog += 1
+        _decay_backlog(self, self.decay)
+
         return best_worker
 
     def put(self, worker):
-        if worker.lnode is None:
-            worker.lnode = self.unskilled.append(worker)
-            self.event.set()
-        if worker.backlog:
+        self.backlog -= 1
+        assert self.backlog >= 0, self.backlog
+        _decay_backlog(self, self.decay)
+        if worker.backlog > 0:
             worker.backlog -= 1
+            _decay_backlog(worker, self.worker_decay)
 
+def _init_backlog(worker):
+    worker.backlog = worker.nbacklog = worker.dbacklog = worker.mbacklog = 0
+
+def _decay_backlog(worker, decay):
+    worker.dbacklog = worker.dbacklog*decay + worker.backlog
+    worker.nbacklog = worker.nbacklog*decay + 1
+    worker.mbacklog = worker.dbacklog / worker.nbacklog
+
 class Worker(zc.resumelb.util.Worker):
 
     maxrno = (1<<32) - 1

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-03-04 20:21:20 UTC (rev 124506)
@@ -211,12 +211,13 @@
 
 Workers use their addresses as their reprs.
 
-    >>> print lb.pool
+    >>> print lb.pool # doctest: +ELLIPSIS
     Request classes:
-      h1.com: 127.0.0.1:65027(10.0,0)
-      h2.com: 127.0.0.1:65028(10.0,0)
+      h1.com: 127.0.0.1:0(10.0,0.970...)
+      h2.com: 127.0.0.1:0(10.0,0.485...)
     Backlogs:
-      0: [127.0.0.1:65028, 127.0.0.1:65027]
+      overall backlog: 0 Decayed: 1.47809138734 Avg: 0.73904569367
+      0: [127.0.0.1:0, 127.0.0.1:0]
 
 
 Worker disconnection

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-03-04 20:21:20 UTC (rev 124506)
@@ -15,8 +15,27 @@
 We'll test the pool with stand-ins for the local workers.
 
     >>> import zc.resumelb.lb
-    >>> pool = zc.resumelb.lb.Pool()
+    >>> pool = zc.resumelb.lb.Pool(
+    ...     variance=2, backlog_history=2, unskilled_score=.5)
 
+We specified a number of optional paramters that we'll see in action
+later:
+
+variance
+   How many times the pool mean backlog we'll ket a worker's backlog
+   rise before we look for another worker. The default is 4.
+
+backlog_history
+   The (aproximate) number of requests to include in the mean backlog
+   for a worker. The default is 9.
+
+   The pool also has a backlog history, which is the product of the
+   worker backlog_history and the number of workers.
+
+unskilled_score
+   The score to assign to unsklilled workers. The default is 1.
+
+
 The get method is used to get a worker from the pool.  A request class
 and an optional timeout is passed. (The timeout is mainly useful for
 testing.)
@@ -64,20 +83,24 @@
  We're gonna be white box and look at the pool data structures from
  time to time.
 
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(1.0,1)
-      foo: w1(1.0,2)
+      bar: w2(0.5,1.0)
+      foo: w1(0.5,1.57...)
     Backlogs:
+      overall backlog: 3 Decayed: 2.08... Avg: 1.04...
       1: [w2]
       2: [w1]
 
 Here, we can see that w1 is used for the foo class and w2 for the bar
-class.  In the request classes, the worker's score and it's overall
-backlog if shown in paretheses.  We see that both workers have a score
+class.  In the request classes, the worker's score and it's decayed
+backlog is shown in paretheses.  We see that both workers have a score
 of 1.0.  This is the default score for new workers.  We'll say more
 about this later.
 
+The decayed backlog, for the pool, and for workers, is an "average"
+backlog over a backlog history, which is a configation parameter.
+
 Let's add another worker:
 
     >>> w3 = Worker('w3')
@@ -85,114 +108,133 @@
 
 and make some more foo requests:
 
-    >>> [pool.get('foo') for i in range(3)]
-    [w1, w1, w1]
+    >>> [pool.get('foo') for i in range(4)]
+    [w1, w1, w1, w1]
 
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(1.0,1)
-      foo: w1(1.0,5)
+      bar: w2(0.5,1.0)
+      foo: w1(0.5,4.29907929908)
     Backlogs:
+      overall backlog: 7 Decayed: 4.39137888071 Avg: 1.46379296024
       0: [w3]
       1: [w2]
-      5: [w1]
+      6: [w1]
 
 Even though we still had a worker with no backlog, we kept sending
-requests to w1.  This is because w1 hasn't reached it's maximum
-backlog.  Also, it's score is greater than the min score, which
-defaults to 1.0.  Let's reduce the maximum backlog to 5:
+requests to w1.  This is because w1 hasn't reached the maximum
+backlog, which is:
 
-    >>> pool.max_backlog = 5
+   variance * max(2, pool_mean_backlog)
 
 So now, w1 has reached it's maximum backlog.  If
-we make another foo request, we'll start using w3, and when that's
-reached it's maximum backlog, we'll start using w2:
+we make another foo request, we'll start using w3:
 
-    >>> [pool.get('foo') for i in range(7)]
-    [w3, w3, w3, w3, w3, w2, w2]
+    >>> [pool.get('foo') for i in range(10)]
+    [w3, w3, w3, w3, w3, w3, w1, w3, w1, w3]
 
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(1.0,3)
-      foo: w1(1.0,5), w2(1.0,3), w3(1.0,5)
+      bar: w2(0.5,1.0)
+      foo: w1(0.5,5.89000423908), w3(0.5,5.89000423908)
     Backlogs:
-      3: [w2]
-      5: [w1, w3]
+      overall backlog: 17 Decayed: 11.0516220545 Avg: 3.68387401818
+      1: [w2]
+      8: [w1, w3]
 
-If we get all workers to the maximum backlog, we'll block until a
-worker is free.
+Something interesting heppened here.  After several requests, the
+pools switched back and forth between w1 and w3.  It never switched to
+w2.  This is because, as the backlogs for w2 and w2 increased, so did
+the mean backlog.  This is useful, because it prevents a single
+request class from taking over the entire pool.  Let's see what
+happens when we add some new workers:
 
+    >>> w4 = Worker('w4')
+    >>> pool.new_resume(w4, {})
+    >>> w5 = Worker('w5')
+    >>> pool.new_resume(w5, {})
+
+    >>> pool # doctest: +ELLIPSIS
+    Request classes:
+      bar: w2(0.5,1.0)
+      foo: w1(0.5,5.89000423908), w3(0.5,5.89000423908)
+    Backlogs:
+      overall backlog: 17 Decayed: 11.0516220545 Avg: 2.21032441091
+      0: [w4, w5]
+      1: [w2]
+      8: [w1, w3]
+
     >>> [pool.get('foo') for i in range(2)]
-    [w2, w2]
+    [w5, w5]
 
-    >>> pool.get('foo', 0.0)
+The new workers caused the overall mean to drop and when we got a foo
+request, it went to the newest new worker.  The subsequent request
+went to that worker because it was skilled and had a fairly low backlog.
 
 When a worker is done doing it's work, we put it back in the pool:
 
-    >>> pool.put(w1)
-    >>> pool.put(w1)
-    >>> pool.put(w1)
-    >>> pool.put(w2)
-    >>> pool.put(w3)
-    >>> pool.put(w3)
-    >>> pool
+    >>> for i in range(8):
+    ...     pool.put(w1)
+    >>> for i in range(8):
+    ...     pool.put(w3)
+
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(1.0,4)
-      foo: w1(1.0,2), w2(1.0,4), w3(1.0,3)
+      bar: w2(0.5,1.0)
+      foo: w1(0.5,2.45398560273), w3(0.5,2.45398560273), w5(0.5,1.57142857143)
     Backlogs:
-      2: [w1]
-      3: [w3]
-      4: [w2]
+      overall backlog: 3 Decayed: 10.2983868199 Avg: 2.05967736397
+      0: [w1, w3, w4]
+      1: [w2]
+      2: [w5]
 
 Now, when we get a worker, we'll get w1.
 
     >>> pool.get('foo', 0.0)
     w1
 
-Why? We adjust each score by the worker's backlog, so even though all
-2 workers had the same score, w1 is chosen because it has the smallest
-backlog.
-
 Now that we've done some work, let's update the resumes.  This will
 normally be done by workers periodically, after collecting performance
 data.
 
     >>> pool.new_resume(w1, {'foo': 6.0})
-
     >>> pool.new_resume(w2, {'bar': 2.0, 'foo': 2.0})
-
     >>> pool.new_resume(w3, {'foo': 3.8})
 
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(2.0,4)
-      foo: w2(2.0,4), w3(3.8,3), w1(6.0,3)
+      bar: w2(2.0,1.0)
+      foo: w5(0.5,1.57...), w2(2.0,1.0), w3(3.8,2.45...), w1(6.0,2.08...)
     Backlogs:
-      3: [w1, w3]
-      4: [w2]
+      overall backlog: 4 Decayed: 9.90317056202 Avg: 1.9806341124
+      0: [w3, w4]
+      1: [w1, w2]
+      2: [w5]
 
     >>> pool.get('foo')
+    w3
+    >>> pool.get('foo')
     w1
     >>> pool.get('foo')
     w1
 
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(2.0,4)
-      foo: w2(2.0,4), w3(3.8,3), w1(6.0,5)
+      bar: w2(2.0,1.0)
+      foo: w5(0.5,1.57...), w2(2.0,1.0), w3(3.8,2.08...), w1(6.0,2.30...)
     Backlogs:
-      3: [w3]
-      4: [w2]
-      5: [w1]
+      overall backlog: 7 Decayed: 9.23495653237 Avg: 1.84699130647
+      0: [w4]
+      1: [w2, w3]
+      2: [w5]
+      3: [w1]
 
-Because w1 has reached the maximum backlog, it's out of the running.
-
     >>> pool.get('foo')
     w3
     >>> pool.get('foo')
+    w1
+    >>> pool.get('foo')
     w3
-    >>> pool.get('foo')
-    w2
 
     >>> pool.put(w1)
     >>> pool.put(w3)
@@ -200,37 +242,20 @@
     >>> pool.put(w3)
     >>> pool.put(w3)
     >>> pool.put(w3)
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(2.0,5)
-      foo: w2(2.0,5), w3(3.8,0), w1(6.0,4)
+      bar: w2(2.0,1.0)
+      foo: w5(0.5,1.57...), w2(2.0,1.0), w3(3.8,1.43...), w1(6.0,2.79...)
     Backlogs:
-      0: [w3]
-      4: [w1]
-      5: [w2]
+      overall backlog: 4 Decayed: 8.35... Avg: 1.67...
+      0: [w3, w4]
+      1: [w2]
+      2: [w5]
+      3: [w1]
 
     >>> [pool.get('foo') for i in range(5)]
-    [w3, w3, w3, w1, w3]
+    [w3, w3, w1, w3, w1]
 
-Pool settings
-=============
-
-There are several settings that effect pools:
-
-max_backlog
-  Maximum worker backlog, defaulting to 40.
-
-min_score
-  A worker won't be used if it has a backlog greater than 1 and it's
-  score is less than min_score.
-
-unskilled_score
-  The score assigned to workers when given a new request class.  This
-  defaults to min_score.
-
-We've already seen max_backlog at work.  Let's test the other
-settings.
-
 Worker disconnect
 =================
 
@@ -238,9 +263,37 @@
 
     >>> pool.remove(w1)
     >>> pool.remove(w3)
-    >>> pool
+    >>> pool # doctest: +ELLIPSIS
     Request classes:
-      bar: w2(2.0,5)
-      foo: w2(2.0,5)
+      bar: w2(2.0,1.0)
+      foo: w5(0.5,1.57142857143), w2(2.0,1.0)
     Backlogs:
-      5: [w2]
+      overall backlog: 9 Decayed: 8.043843598 Avg: 2.68128119933
+      0: [w4]
+      1: [w2]
+      2: [w5]
+
+Updating worker settings
+========================
+
+    >>> pool = zc.resumelb.lb.Pool()
+    >>> pool.new_resume(w1, {'foo': 6.0})
+    >>> pool.new_resume(w2, {'bar': 2.0, 'foo': 2.0})
+    >>> pool.new_resume(w3, {'foo': 3.8})
+    >>> pool.variance, pool.backlog_history, pool.unskilled_score
+    (4.0, 9, 1.0)
+    >>> pool.worker_decay, pool.decay # doctest: +ELLIPSIS
+    (0.944..., 0.981...)
+
+    >>> pool.update_settings(dict(variance=2.0))
+    >>> pool.variance, pool.backlog_history, pool.unskilled_score
+    (2.0, 9, 1.0)
+    >>> pool.worker_decay, pool.decay # doctest: +ELLIPSIS
+    (0.944..., 0.981...)
+
+    >>> pool.update_settings(
+    ... dict(variance=3.0, backlog_history=6, unskilled_score=.25))
+    >>> pool.variance, pool.backlog_history, pool.unskilled_score
+    (3.0, 6, 0.25)
+    >>> pool.worker_decay, pool.decay # doctest: +ELLIPSIS
+    (0.916..., 0.972...)

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.py	2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.py	2012-03-04 20:21:20 UTC (rev 124506)
@@ -25,6 +25,7 @@
            threads=None, backdoor=False, run=True, **kw):
     """Paste deploy server runner
     """
+    # XXX support log level
     if loggers:
         import ZConfig
         ZConfig.configureLoggers(loggers)
@@ -123,6 +124,7 @@
         else:
             return
 
+    # XXX default to basic config?
     if options.logger_configuration:
         import ZConfig
         with open(options.logger_configuration) as f:



More information about the checkins mailing list