[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/ Refactored workers to use a decayed performance statistics rather than

Jim Fulton jim at zope.com
Sun Mar 4 13:28:50 UTC 2012


Log message for revision 124503:
  Refactored workers to use a decayed performance statistics rather than
  a time ring to compute their resumes. This has a number of advantages:
  
  - Can handle much larger resumes
  - Can decoupld update of resumes from retirement of unused request
    classes.
  - Gradual, deemphasis of older samples.
  - Resume is always up-to-date
  - Resume computation if spread over all requests.
  
  Got rid of target-resume-size approach to spreading load in lb pool.
  This doesn't work out in practice because different request classes
  often have very different impact so resume size alone isn't really
  reasonable.
  
  Also got rid of dubious min_score.
  
  Thwe next lb refactoring (already done in perfmance testing will
  provide much more rational distribution of work under high load.
  
  We still need an aproach for better allocation of load under low load.
  

Changed:
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/zk.test

-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-03-04 09:56:21 UTC (rev 124502)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-03-04 13:28:48 UTC (rev 124503)
@@ -87,11 +87,8 @@
 
 class Pool:
 
-    def __init__(self, redundancy=1, max_backlog=40, min_score=1.0,
-                 unskilled_score=None):
-        self.redundancy = redundancy
+    def __init__(self, max_backlog=40, unskilled_score=1.0):
         self.max_backlog = max_backlog
-        self.min_score = min_score
         self.unskilled_score = unskilled_score
 
         self.workers = set()
@@ -100,8 +97,7 @@
         self.event = gevent.event.Event()
 
     def update_settings(self, settings):
-        for name in ('redundancy', 'max_backlog',
-                     'min_score', 'unskilled_score'):
+        for name in ('max_backlog', 'unskilled_score'):
             if name in settings:
                 setattr(self, name, settings[name])
 
@@ -131,9 +127,6 @@
         unskilled = self.unskilled
         workers = self.workers
 
-        target_skills_per_worker = (
-            1 + self.redundancy * len(skilled) / (len(workers) or 1))
-
         if worker in workers:
             for rclass, score in worker.resume.iteritems():
                 skilled[rclass].remove((score, worker))
@@ -148,13 +141,8 @@
             workers.add(worker)
             worker.lnode = unskilled.appendleft(worker)
 
-        resumeitems = resume.items()
-        drop = (len(resume) - target_skills_per_worker) / 2
-        if drop > 0:
-            resumeitems = sorted(resumeitems, key=lambda i: i[1])[drop:]
-
-        worker.resume = dict(resumeitems)
-        for rclass, score in resumeitems:
+        worker.resume = resume
+        for rclass, score in resume.iteritems():
             try:
                 skilled[rclass].add((score, worker))
             except KeyError:
@@ -177,7 +165,6 @@
 
         # Look for a skilled worker
         max_backlog = self.max_backlog
-        min_score = self.min_score
         best_score = 0
         best_worker = None
         skilled = self.skilled.get(rclass)
@@ -185,21 +172,8 @@
             skilled = self.skilled[rclass] = set()
         for score, worker in skilled:
             backlog = worker.backlog + 1
-            if backlog > 2:
-                if (
-                    # Don't let a worker get too backed up
-                    backlog > max_backlog or
-
-                    # We use min score as a way of allowing other workers
-                    # a chance to pick up work even if the skilled workers
-                    # haven't reached their backlog.  This is mainly a tuning
-                    # tool for when a worker is doing OK, but maybe still
-                    # doing too much.
-                    (score < min_score and
-                     unskilled and unskilled.first.value.backlog == 0
-                     )
-                    ):
-                    continue
+            if backlog > max_backlog:
+                continue
             score /= backlog
             if (score > best_score):
                 best_score = score
@@ -221,20 +195,11 @@
 
                 # We now have an unskilled worker and we need to
                 # assign it a score.
-                # - It has to be >= min score, or it won't get future work.
                 # - We want to give it work somewhat gradually.
                 # - We got here because:
                 #   - there are no skilled workers,
-                #   - The skilled workers have all either:
-                #     - Eached their max backlog, or
-                #     - Have scores > min score
-                # Let's set it to min score because either:
-                # - There are no skilled workers, so they'll all get the same
-                # - Other workers are maxed out, or
-                # - The score will be higher than some the existing, so it'll
-                #   get work
-                # We also allow for an unskilled_score setting to override.
-                score = self.unskilled_score or self.min_score
+                #   - The skilled workers have all reached their max backlog
+                score = self.unskilled_score
                 best_worker.resume[rclass] = score
                 skilled.add((score, best_worker))
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-03-04 09:56:21 UTC (rev 124502)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-03-04 13:28:48 UTC (rev 124503)
@@ -6,15 +6,13 @@
 implements the load balancing algorithm.  The pool has a collection of
 workers organized according to their resumes.
 
-The load balancer works by accepting remote worker connections and
-adding local workers to the pool, and by accepting wsgi request,
-getting local workers from the pool and passing the wsgi requests to
-the local workers, which, in term, forwward the requests to the remote
-workers.
+The load balancer works by connecting to workers, creating local
+workers for each connection, adding local workers to the pool, and
+by accepting wsgi request, getting local workers from the pool and
+passing the wsgi requests to the local workers, which, in term,
+forwward the requests to the remote workers.
 
-We'll test the pool with stand-ins for the local workers.  The pool
-constructor takes a settings mapping object.  This allows the settings
-to be managed in real time.
+We'll test the pool with stand-ins for the local workers.
 
     >>> import zc.resumelb.lb
     >>> pool = zc.resumelb.lb.Pool()
@@ -43,10 +41,6 @@
 
     >>> pool.new_resume(w1, {})
 
-As far as the pool is concerned, any object that can be in a set or be
-used as a dictionary key can be used as a worker.  The pool doesn't
-care.  The pool does add some extra attrobutes to workers.
-
    >>> pool.get('foo', 0.0)
    w1
 
@@ -223,9 +217,6 @@
 
 There are several settings that effect pools:
 
-redundancy
-  Target number of workers for each request class, defaulting to 1.
-
 max_backlog
   Maximum worker backlog, defaulting to 40.
 
@@ -240,123 +231,6 @@
 We've already seen max_backlog at work.  Let's test the other
 settings.
 
-redundancy
-----------
-
-Given a redundancy, we can compute an expected number of request
-classes per worker (resumne size), which is the number of request
-classes divided by the number of workers times the redundancy.  With
-special handling for no workers or request classes, this works out
-to::
-
-  1 + redundancy * n_request_classes / max(nworkers, 1)
-
-When we get a new resume and it is larger than the expected size, we
-discard half of the excess number of items with the lowest score.
-Given the pool data:
-
-    >>> pool
-    Request classes:
-      bar: w2(2.0,5)
-      foo: w2(2.0,5), w3(3.8,4), w1(6.0,5)
-    Backlogs:
-      4: [w3]
-      5: [w1, w2]
-
-We see there are 2 request classes and 3 workers, so we expect one
-request class per worker.
-
-Let's add a new worker with a much larger resume:
-
-    >>> w4 = Worker('w4')
-    >>> pool.new_resume(w4, dict((str(i), float(i)) for i in range(9)))
-
-When we look at the pool, we see that 4 of the items were discarded:
-
-    >>> pool
-    Request classes:
-      4: w4(4.0,0)
-      5: w4(5.0,0)
-      6: w4(6.0,0)
-      7: w4(7.0,0)
-      8: w4(8.0,0)
-      bar: w2(2.0,5)
-      foo: w2(2.0,5), w3(3.8,4), w1(6.0,5)
-    Backlogs:
-      0: [w4]
-      4: [w3]
-      5: [w1, w2]
-
-Now we have 7 request classes and 4 workers.  If we set redundancy to
-3, then the expected resume size is 6, so::
-
-    >>> pool.redundancy = 3
-    >>> pool.new_resume(w4, dict((str(i), float(i)) for i in range(9)))
-
-    >>> pool
-    Request classes:
-      1: w4(1.0,0)
-      2: w4(2.0,0)
-      3: w4(3.0,0)
-      4: w4(4.0,0)
-      5: w4(5.0,0)
-      6: w4(6.0,0)
-      7: w4(7.0,0)
-      8: w4(8.0,0)
-      bar: w2(2.0,5)
-      foo: w2(2.0,5), w3(3.8,4), w1(6.0,5)
-    Backlogs:
-      0: [w4]
-      4: [w3]
-      5: [w1, w2]
-
-min_score
----------
-
-min_score is mainly provided as a tool to balance work accross
-skills. The algorithm favors giving work to skilled workers.
-If one worker handles a large number of request classes, relative to
-other workers, it might perform sub-optimally, but if load is too low
-to force it to it's maximum backlog, it won't transfer work to other
-workers. min_score provides a tool to help with this.  If a worker has
-a low score and only a modest backlog, it won't be used.
-
-To see this, let's reduce w3's backlog:
-
-    >>> pool.put(w3); pool.put(w3)
-
-but set the min_score to 4:
-
-    >>> pool.min_score = 4.0
-
-And the get a worker for foo:
-
-    >>> pool.get('foo')
-    w4
-    >>> pool
-    Request classes:
-      1: w4(1.0,1)
-      2: w4(2.0,1)
-      3: w4(3.0,1)
-      4: w4(4.0,1)
-      5: w4(5.0,1)
-      6: w4(6.0,1)
-      7: w4(7.0,1)
-      8: w4(8.0,1)
-      bar: w2(2.0,5)
-      foo: w2(2.0,5), w3(3.8,2), w4(4.0,1), w1(6.0,5)
-    Backlogs:
-      1: [w4]
-      2: [w3]
-      5: [w1, w2]
-
-We get the unskilled w4 because w1 and w2 are at their maximum
-backlogs, and w3 has a backloh of 2 and a score of only 3.8.
-
-Note that w4 is assigned a skill for foo of 4, which is min_score.
-
-XXX It's unclear if min_score provides much, if any, benefit.
-
 Worker disconnect
 =================
 
@@ -366,16 +240,7 @@
     >>> pool.remove(w3)
     >>> pool
     Request classes:
-      1: w4(1.0,1)
-      2: w4(2.0,1)
-      3: w4(3.0,1)
-      4: w4(4.0,1)
-      5: w4(5.0,1)
-      6: w4(6.0,1)
-      7: w4(7.0,1)
-      8: w4(8.0,1)
       bar: w2(2.0,5)
-      foo: w2(2.0,5), w4(4.0,1)
+      foo: w2(2.0,5)
     Backlogs:
-      1: [w4]
       5: [w2]

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2012-03-04 09:56:21 UTC (rev 124502)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2012-03-04 13:28:48 UTC (rev 124503)
@@ -30,6 +30,9 @@
 
 pid = os.getpid()
 
+###############################################################################
+# Bobo test app:
+
 @bobo.resource
 def hi(request):
     body = request.environ['wsgi.input'].read()
@@ -65,6 +68,9 @@
 def app():
     return bobo.Application(bobo_resources=__name__)
 
+#
+###############################################################################
+
 def test_classifier(env):
     return "yup, it's a test"
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-03-04 09:56:21 UTC (rev 124502)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-03-04 13:28:48 UTC (rev 124503)
@@ -24,12 +24,16 @@
 
 class Worker:
 
-    def __init__(self, app, addr, history=999,
+    def __init__(self, app, addr,
+                 history=9999, max_skill_age=None,
                  resume_file=None, threads=None, tracelog=None):
         self.app = app
         self.history = history
-        self.worker_request_number = 0
+        self.max_skill_age = max_skill_age or history * 10
+        self.decay = 1.0-1.0/history
         self.resume_file = resume_file
+        self.perf_data = {} # rclass -> (gen, decayed times, decayed counts)
+        self.generation = 0
         self.resume = {}
         if self.resume_file and os.path.exists(self.resume_file):
             try:
@@ -37,8 +41,11 @@
                     self.resume = marshal.load(f)
             except Exception:
                 logger.exception('reading resume file')
-        self.time_ring = []
-        self.time_ring_pos = 0
+            else:
+                for rclass, rpm in self.resume.iteritems():
+                    if rpm > 0:
+                        self.perf_data[rclass] = 0, 1.0/rpm, history
+
         self.connections = set()
 
         if threads:
@@ -109,6 +116,11 @@
     def update_settings(self, data):
         if 'history' in data:
             self.history = data['history']
+            if 'max_skill_age' not in data:
+                self.max_skill_age = self.history * 10
+        if 'max_skill_age' in data:
+            self.max_skill_age = data['max_skill_age']
+        self.decay = 1 - 1.0/self.history
 
     def stop(self):
         self.server.stop()
@@ -182,32 +194,35 @@
 
                 conn.put((rno, ''))
 
+                # Update resume
                 elapsed = max(time.time() - env['zc.resumelb.time'], 1e-9)
-                time_ring = self.time_ring
-                time_ring_pos = rno % self.history
                 rclass = env['zc.resumelb.request_class']
-                try:
-                    time_ring[time_ring_pos] = rclass, elapsed
-                except IndexError:
-                    while len(time_ring) <= time_ring_pos:
-                        time_ring.append((rclass, elapsed))
+                generation = self.generation + 1
+                perf_data = self.perf_data.get(rclass)
+                if perf_data:
+                    rgen, rtime, rcount = perf_data
+                else:
+                    rgen = generation
+                    rtime = rcount = 0
 
-                worker_request_number = self.worker_request_number + 1
-                self.worker_request_number = worker_request_number
-                if worker_request_number % self.history == 0:
-                    byrclass = {}
-                    for rclass, elapsed in time_ring:
-                        sumn = byrclass.get(rclass)
-                        if sumn:
-                            sumn[0] += elapsed
-                            sumn[1] += 1
-                        else:
-                            byrclass[rclass] = [elapsed, 1]
-                    self.new_resume(dict(
-                        (rclass, n/sum)
-                        for (rclass, (sum, n)) in byrclass.iteritems()
-                        ))
+                decay = self.decay ** (generation - rgen)
+                rgen = generation
+                rtime = rtime * decay + elapsed
+                rcount = rcount * decay + 1
 
+                self.generation = generation
+                self.perf_data[rclass] = rgen, rtime, rcount
+                self.resume[rclass] = rcount / rtime
+
+                if generation % self.history == 0:
+                    min_gen = generation - self.max_skill_age
+                    for rclass in [r for (r, d) in self.perf_data.iteritems()
+                                   if d[0] < min_gen]:
+                        del self.perf_data[rclass]
+                        del self.resume[rclass]
+
+                    self.new_resume(self.resume)
+
             except zc.resumelb.util.Disconnected:
                 return # whatever
             finally:

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.test	2012-03-04 09:56:21 UTC (rev 124502)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.test	2012-03-04 13:28:48 UTC (rev 124503)
@@ -10,10 +10,17 @@
     >>> worker = zc.resumelb.worker.Worker(
     ...   zc.resumelb.tests.app(), ('127.0.0.1', 0), history=5)
 
-Here we created a worker using a test application, telling it to
-an address to listen on and to update it's resume after every
+Here we created a worker using a test application, telling it
+an address to listen on and to compute it's resume based on roughly
 five requests.
 
+We could also have specified a max_skill_age parameter, which controls
+how long will keep an unused skill in the resume. It defaults to 10x
+the history:
+
+    >>> worker.max_skill_age
+    50
+
 Note that we passed 0 as the address port. This causes an ephemeral
 port to be used. We can get the actual address using ``worker.addr``.
 
@@ -179,9 +186,9 @@
 size and the request body hash.  Note that the hashes match the test
 bodies we created.
 
-We told the worker to use a history of length 5.  This means that it
-will keep track of times for the last 5 requests and compute a new
-resume after 5 requests.  Let's test that by making a 5th request.
+We told the worker to use a history of 5.  This means that it
+will send a new resume to the lb every 5 requests. Let's test that by
+making a 5th request.
 
     >>> write_message(worker_socket, 5, newenv('2', '/sleep.html?dur=.11'), '')
     >>> print_response(worker_socket, 5)
@@ -231,10 +238,10 @@
 
     >>> worker_socket2 = gevent.socket.create_connection(worker.addr)
 
-We're immediately send the worker's resume -- same as the old.
+We're immediately send the worker's resume.
 
-    >>> read_message(worker_socket2) == (zero, resume)
-    True
+    >>> read_message(worker_socket2)[0]
+    0
 
 And send simultaneous requests to each connection:
 
@@ -375,9 +382,13 @@
     ...     marshal.dump(dict(a=1.0, b=2.0), f)
 
     >>> worker = zc.resumelb.worker.Worker(
-    ...   zc.resumelb.tests.app(), ('127.0.0.1', 0), history=2,
+    ...   zc.resumelb.tests.app(), ('127.0.0.1', 0),
+    ...   history=2, max_skill_age=3,
     ...   resume_file='resume.mar')
 
+Note that we specified a max_skill_age if 4, rather than the default
+10x the history.
+
     >>> from pprint import pprint
     >>> env = newenv('test', '/hi.html')
     >>> worker_socket = gevent.socket.create_connection(worker.addr)
@@ -394,13 +405,28 @@
 At this point, the worker should have output a new resume.
 
     >>> rno, resume = read_message(worker_socket)
-    >>> rno, list(resume)
-    (0, ['test'])
+    >>> rno, sorted(resume)
+    (0, ['a', 'b', 'test'])
 
     >>> with open('resume.mar') as f:
-    ...     list(marshal.load(f))
-    ['test']
+    ...     sorted(marshal.load(f))
+    ['a', 'b', 'test']
 
+If we do 2 more requests:
+
+    >>> write_message(worker_socket, 3, env, '')
+    >>> print_response(worker_socket, 3) # doctest: +ELLIPSIS
+    3 200 OK...
+    >>> write_message(worker_socket, 4, env, '')
+    >>> print_response(worker_socket, 4) # doctest: +ELLIPSIS
+    4 200 OK...
+
+We'll get a new resume that ha sdropped the 'a' and 'b' skills:
+
+    >>> rno, resume = read_message(worker_socket)
+    >>> rno, sorted(resume)
+    (0, ['test'])
+
 Ignore empty strings in application iterables
 ---------------------------------------------
 
@@ -572,3 +598,98 @@
     >>> logger.removeHandler(handler)
     >>> logger.setLevel(logging.NOTSET)
     >>> worker.stop()
+
+Updating worker settings
+------------------------
+
+Workers have an update_settings method that can be used to update
+settings at run time. It takes a settings dictionary.
+
+    >>> worker = zc.resumelb.worker.Worker(
+    ...   zc.resumelb.tests.app(), ('127.0.0.1', 0),
+    ...   history=2)
+
+    >>> worker.history, worker.max_skill_age
+    (2, 20)
+
+    >>> worker.update_settings(dict(history=5))
+    >>> worker.history, worker.max_skill_age
+    (5, 50)
+
+    >>> worker.update_settings(dict(history=5, max_skill_age=9))
+    >>> worker.history, worker.max_skill_age
+    (5, 9)
+
+    >>> worker.update_settings(dict(max_skill_age=99))
+    >>> worker.history, worker.max_skill_age
+    (5, 99)
+
+Resume computation and decay
+----------------------------
+
+Workers keep running totals of how well they do with request classes.
+These totals are decayed using a decay computed from the history.
+
+    >>> .79999 < worker.decay < .8001
+    True
+    >>> worker.update_settings(dict(history=2))
+    >>> worker.decay
+    0.5
+    >>> worker.update_settings(dict(history=1))
+    >>> worker.decay
+    0.0
+
+    >>> worker_socket = gevent.socket.create_connection(worker.addr)
+    >>> read_message(worker_socket)
+    (0, {})
+
+    >>> rno = 0
+    >>> env = newenv('test', '/hi.html')
+    >>> def assert_(cond, mess=None):
+    ...     if not cond:
+    ...         print 'assertion failed', mess
+    >>> def make_request():
+    ...     global rno
+    ...     rno += 1
+    ...     write_message(worker_socket, rno, env, '')
+    ...     assert_(read_message(worker_socket)[0] == rno)
+    ...     assert_(read_message(worker_socket)[0] == rno)
+    ...     assert_(read_message(worker_socket) == (rno, ''))
+
+With a decay of 0, the resume is based on the speed of the previous
+request:
+
+    >>> now = rtime = 1.0
+    >>> def faux_time():
+    ...     global now
+    ...     now += rtime
+    ...     return now
+
+    >>> with mock.patch('time.time') as time:
+    ...     time.side_effect = faux_time
+    ...     make_request()
+
+    >>> read_message(worker_socket)
+    (0, {'test': 1.0})
+
+    >>> rtime = 2.0
+
+    >>> with mock.patch('time.time') as time:
+    ...     time.side_effect = faux_time
+    ...     make_request()
+
+    >>> read_message(worker_socket)
+    (0, {'test': 0.5})
+
+With a decay of .5:
+
+    >>> worker.update_settings(dict(history=2))
+    >>> with mock.patch('time.time') as time:
+    ...     time.side_effect = faux_time
+    ...     rtime = 2.0
+    ...     make_request()
+    ...     rtime = 0.5
+    ...     make_request()
+
+    >>> read_message(worker_socket)
+    (0, {'test': 0.875})

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.test	2012-03-04 09:56:21 UTC (rev 124502)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.test	2012-03-04 13:28:48 UTC (rev 124503)
@@ -12,7 +12,8 @@
     ...   /lb
     ...     /providers
     ...     /workers
-    ...       history=999
+    ...       history = 32
+    ...       max_skill_age = 90
     ...       /providers
     ... """)
 
@@ -72,9 +73,15 @@
 
 The worker got it's settings from the tree:
 
+    >>> worker.max_skill_age
+    90
+
     >>> worker.history
-    999
+    32
 
+    >>> worker.decay
+    0.96875
+
 It loaded it's resume from resume.mar:
 
     >>> from pprint import pprint
@@ -189,7 +196,7 @@
     >>> len(lb.pool.workers)
     2
 
-OK, so let's try a more complex examlpe.  Maybe we can exercise all of
+OK, so let's try a more complex example.  Maybe we can exercise all of
 the options!
 
     >>> server.stop()
@@ -257,8 +264,8 @@
 By looking at the lb's pool's skilled data structure, we can see that
 the test request classifier was used.
 
-    >>> list(lb.pool.skilled)
-    ['a', 'b', "yup, it's a test"]
+    >>> sorted(lb.pool.skilled)
+    ['a', 'b', 'h1.com', "yup, it's a test"]
 
 Let's shut down the workers:
 
@@ -272,7 +279,6 @@
     >>> len(lb.workletts)
     0
 
-
 Not that if we looked at the number of pools, it would still be
 2. This is an artifact of the way the test is run.  We shutdown/killed
 the servers, but we didn't close the open worker connections.  We



More information about the checkins mailing list