[Checkins] SVN: Sandbox/J1m/resumelb/ Added a simulation script.

Jim Fulton jim at zope.com
Fri Jan 6 10:41:10 UTC 2012


Log message for revision 123965:
  Added a simulation script.
  
  Refactored the way unskilled workers are handled.
  
  Changed the way settings are handled to facilitate managing settings
  with ZooKeeper (as the simulation script does).
  

Changed:
  U   Sandbox/J1m/resumelb/buildout.cfg
  U   Sandbox/J1m/resumelb/setup.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
  A   Sandbox/J1m/resumelb/src/zc/resumelb/simul.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.py

-=-
Modified: Sandbox/J1m/resumelb/buildout.cfg
===================================================================
--- Sandbox/J1m/resumelb/buildout.cfg	2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/buildout.cfg	2012-01-06 10:41:09 UTC (rev 123965)
@@ -1,6 +1,5 @@
 [buildout]
 develop = .
-#parts = test py
 parts = gevent py ctl
 
 [ctl]
@@ -25,7 +24,10 @@
 recipe = zc.recipe.egg
 eggs = ${test:eggs}
        PasteScript
-       
+       zc.zk [static]
+       pylru
+       ZODB3
+entry-points = simul=zc.resumelb.simul:main       
 interpreter = py
 
 [lb]

Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py	2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/setup.py	2012-01-06 10:41:09 UTC (rev 123965)
@@ -14,7 +14,8 @@
 name, version = 'zc.resumelb', '0'
 
 install_requires = [
-    'setuptools', 'gevent', 'WebOb', 'zc.thread', 'zc.mappingobject']
+    'setuptools', 'gevent', 'WebOb', 'zc.thread', 'zc.parse_addr',
+    'zc.mappingobject']
 extras_require = dict(
     test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-01-06 10:41:09 UTC (rev 123965)
@@ -1,4 +1,4 @@
-import bisect
+from bisect import bisect_left, insort
 import gevent
 import gevent.hub
 import gevent.pywsgi
@@ -6,6 +6,7 @@
 import logging
 import sys
 import webob
+import zc.mappingobject
 import zc.resumelb.util
 
 block_size = 1<<16
@@ -22,11 +23,12 @@
 class LB:
 
     def __init__(self, worker_addr, classifier,
-                 disconnect_message=default_disconnect_message
+                 settings=None,
+                 disconnect_message=default_disconnect_message,
                  ):
         self.classifier = classifier
         self.disconnect_message = disconnect_message
-        self.pool = Pool()
+        self.pool = Pool(settings)
         self.worker_server = gevent.server.StreamServer(
             worker_addr, self.handle_worker)
         self.worker_server.start()
@@ -62,117 +64,164 @@
 
 class Pool:
 
-    def __init__(self, max_backlog=40):
-        self.max_backlog = max_backlog
-        self.unskilled = [set() for i in range(max_backlog+1)]
-        self.skilled = {}
-        self.resumes = {}
-        self.backlogs = {}
+    def __init__(self, settings=None):
+        if settings is None:
+            settings = dict(
+                max_backlog = 40,
+                unskilled_score = 1.0,
+                )
+        self.settings = settings
+        self.workers = set()
+        self.unskilled = [] # sorted([(uscore, poolworker)])
+        self.skilled = {}   # rclass -> {(score, workers)}
+        self.nskills = 0    # sum of resume lengths
         self.event = gevent.event.Event()
 
     def __repr__(self):
-        skilled = self.skilled
-        backlogs = self.backlogs
         outl = []
         out = outl.append
         out('Request classes:')
-        for rclass in sorted(skilled):
+        for (rclass, skilled) in sorted(self.skilled.items()):
             out('  %s: %s'
                 % (rclass,
-                   ', '.join('%s(%s,%s)' % (worker, score, backlogs[worker])
-                             for (score, worker) in skilled[rclass])
+                   ', '.join(
+                       '%s(%s,%s)' %
+                       (worker, score, worker.backlog)
+                       for (score, worker) in sorted(skilled)
                    ))
+                )
         out('Backlogs:')
-        for backlog, workers in enumerate(self.unskilled):
-            if workers:
-                out('  %s: %s' % (backlog, sorted(workers)))
+        backlogs = {}
+        for worker in self.workers:
+            backlogs.setdefault(worker.backlog, []).append(worker)
+        for backlog, workers in sorted(backlogs.items()):
+            out('  %s: %r' % (backlog, sorted(workers)))
         return '\n'.join(outl)
 
-    def new_resume(self, worker, resume):
+    def new_resume(self, worker, resume=None):
         skilled = self.skilled
-        resumes = self.resumes
-        try:
-            old = resumes[worker]
-        except KeyError:
-            self.backlogs[worker] = 0
-            self.unskilled[0].add(worker)
-            self.event.set()
+        unskilled = self.unskilled
+        if worker in self.workers:
+            if worker.backlog < self.settings['max_backlog']:
+                del unskilled[bisect_left(unskilled, (worker.uscore, worker))]
+            for rclass, score in worker.resume.iteritems():
+                skilled[rclass].remove((score, worker))
+            self.nskills -= len(worker.resume)
         else:
-            for rclass, score in old.iteritems():
-                workers = skilled[rclass]
-                del workers[bisect.bisect_left(workers, (score, worker))]
+            self.workers.add(worker)
+            worker.backlog = 0
 
-        for rclass, score in resume.iteritems():
-            bisect.insort(skilled.setdefault(rclass, []), (score, worker))
+        if resume is None:
+            self.workers.remove(worker)
+        else:
+            worker.resume = resume
+            self.nskills += len(resume)
+            if resume:
+                scores = sorted(resume.values())
+                worker.unskilled_score = max(
+                    self.settings['unskilled_score'],
+                    scores[
+                        min(
+                            max(3, len(scores)/4),
+                            len(scores)-1,
+                            )
+                        ] / 10.0
+                    )
+            else:
+                worker.unskilled_score = (
+                    self.settings['unskilled_score'] * (1.0 + self.nskills) /
+                    len(self.workers))
 
-        resumes[worker] = resume
+            uscore = (
+                worker.unskilled_score /
+                (1.0 + worker.backlog)
+                )
+            worker.uscore = uscore
+            insort(unskilled, (uscore, worker))
+            for rclass, score in resume.iteritems():
+                try:
+                    skilled[rclass].add((score, worker))
+                except KeyError:
+                    skilled[rclass] = set(((score, worker), ))
 
+
+        if self.unskilled:
+            self.event.set()
+
     def remove(self, worker):
-        self.new_resume(worker, {})
-        backlog = self.backlogs.pop(worker)
-        self.unskilled[backlog].remove(worker)
-        del self.resumes[worker]
+        self.new_resume(worker)
 
     def get(self, rclass, timeout=None):
         """Get a worker to handle a request class
         """
-        max_backlog = self.max_backlog
-        backlogs = self.backlogs
+
         unskilled = self.unskilled
-        while 1:
+        if not unskilled:
+            self.event.wait(timeout)
+            if not self.unskilled:
+                return None
 
-            # Look for a skilled worker
-            best_score = 0
-            for score, worker in reversed(self.skilled.get(rclass, ())):
-                backlog = backlogs[worker] + 1
-                if backlog > max_backlog:
-                    continue
-                score /= backlog
-                if score <= best_score:
-                    break
+        # Look for a skilled worker
+        best_score, unskilled_worker = unskilled[-1]
+        best_worker = best_backlog = None
+        max_backlog = self.settings['max_backlog']
+        skilled = self.skilled.get(rclass, ())
+        for score, worker in skilled:
+            backlog = worker.backlog + 1
+            if backlog > max_backlog:
+                continue
+            score /= backlog
+            if (score > best_score
+                or
+                (best_worker is None and worker is unskilled_worker)
+                ):
                 best_score = score
+                best_worker = worker
                 best_backlog = backlog
-                best_worker = worker
 
-            if best_score:
-                unskilled[best_backlog-1].remove(best_worker)
-                unskilled[best_backlog].add(best_worker)
-                backlogs[best_worker] = best_backlog
-                return best_worker
+        if best_worker is not None:
+            uscore = best_worker.uscore
+            del unskilled[bisect_left(unskilled, (uscore, best_worker))]
+        else:
+            uscore, best_worker = unskilled.pop()
+            best_backlog = best_worker.backlog + 1
+            self.nskills += 1
+            resume = best_worker.resume
+            score = max(uscore, self.settings['unskilled_score'] * 10)
+            best_worker.resume[rclass] = score
+            if skilled == ():
+                self.skilled[rclass] = set(((score, best_worker),))
+            else:
+                skilled.add((score, best_worker))
+            lresume = len(resume)
+            uscore *= lresume/(lresume + 1.0)
 
-            # Look for an unskilled worker
-            for backlog, workers in enumerate(unskilled):
-                if workers:
-                    worker = workers.pop()
-                    backlog += 1
-                    try:
-                        unskilled[backlog].add(worker)
-                    except IndexError:
-                        workers.add(worker)
-                    else:
-                        backlogs[worker] = backlog
-                        resume = self.resumes[worker]
-                        if rclass not in resume:
-                            self.resumes[worker][rclass] = 1.0
-                            bisect.insort(self.skilled.setdefault(rclass, []),
-                                          (1.0, worker))
-                        return worker
+        uscore *= best_backlog / (1.0 + best_backlog)
+        best_worker.uscore = uscore
+        best_worker.backlog = best_backlog
+        if best_backlog < max_backlog:
+            insort(unskilled, (uscore, best_worker))
+        return best_worker
 
-            # Dang. Couldn't find a worker, either because we don't
-            # have any yet, or because they're all too busy.
-            self.event.clear()
-            self.event.wait(timeout)
-            if timeout is not None and not self.event.is_set():
-                return None
-
     def put(self, worker):
-        backlogs = self.backlogs
+        backlog = worker.backlog
+        if backlog < 1:
+            return
         unskilled = self.unskilled
-        backlog = backlogs[worker]
-        unskilled[backlog].remove(worker)
+        max_backlog = self.settings['max_backlog']
+        uscore = worker.uscore
+        if backlog < max_backlog:
+            del unskilled[bisect_left(unskilled, (uscore, worker))]
+
+        uscore *= (backlog + 1.0) / backlog
+        worker.uscore = uscore
+
         backlog -= 1
-        unskilled[backlog].add(worker)
-        backlogs[worker] = backlog
+        worker.backlog = backlog
+
+        if backlog < max_backlog:
+            insort(unskilled, (uscore, worker))
+
         self.event.set()
 
 class Worker(zc.resumelb.util.Worker):
@@ -197,6 +246,9 @@
             else:
                 readers[rno](data)
 
+    def __repr__(self):
+        return "worker-%s" % id(self)
+
     def handle(self, rclass, env, start_response):
         logger.debug('handled by %s', self.addr)
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-01-06 10:41:09 UTC (rev 123965)
@@ -15,11 +15,11 @@
     >>> from zc.resumelb.util import read_message, write_message
     >>> worker1 = gevent.socket.create_connection(
     ...    ('127.0.0.1', lb.worker_server.server_port))
-    >>> write_message(worker1, 0, {'h1.com': 1.0})
+    >>> write_message(worker1, 0, {'h1.com': 10.0})
 
     >>> worker2 = gevent.socket.create_connection(
     ...    ('127.0.0.1', lb.worker_server.server_port))
-    >>> write_message(worker2, 0, {'h2.com': 1.0})
+    >>> write_message(worker2, 0, {'h2.com': 10.0})
 
 Now, let's make a request and make sure the data gets where it's
 supposed to go.
@@ -64,6 +64,7 @@
     ...     app2.put, '/hi.html', 'i'*200000, [('Host', 'h1.com')])
 
     >>> rno, env2 = read_message(worker1)
+
     >>> rno
     2
     >>> pprint(env2)
@@ -176,7 +177,7 @@
 At this point, there are no outstanding requests.  The pool back-logs
 should all be 0:
 
-    >>> sum(lb.pool.backlogs.values())
+    >>> sum(worker.backlog for worker in lb.pool.workers)
     0
 
 Worker disconnection

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-01-06 10:41:09 UTC (rev 123965)
@@ -14,7 +14,7 @@
 We'll test the pool with stand-ins for the local workers.
 
     >>> import zc.resumelb.lb
-    >>> pool = zc.resumelb.lb.Pool(max_backlog=5)
+    >>> pool = zc.resumelb.lb.Pool(dict(max_backlog=5, unskilled_score=1.0))
 
 We specified a maximum per-worker backlog for the pool.  We specified
 a fairly low max backlog to make it easier to see what happens when
@@ -37,6 +37,9 @@
     ...         return cmp(self.name, other.name)
     ...     def __hash__(self):
     ...         return hash(self.name)
+    ...     def handle(self, *args):
+    ...         pass
+    ...     Disconnected = None
 
     >>> w1 = Worker('w1')
     >>> pool.new_resume(w1, {})
@@ -71,8 +74,8 @@
 
     >>> pool
     Request classes:
-      bar: w2(1.0,1)
-      foo: w1(1.0,2)
+      bar: w2(10.0,1)
+      foo: w1(10.0,2)
     Backlogs:
       1: [w2]
       2: [w1]
@@ -90,8 +93,8 @@
 
     >>> pool
     Request classes:
-      bar: w2(1.0,1)
-      foo: w1(1.0,5)
+      bar: w2(10.0,1)
+      foo: w1(10.0,5)
     Backlogs:
       0: [w3]
       1: [w2]
@@ -107,8 +110,8 @@
 
     >>> pool
     Request classes:
-      bar: w2(1.0,3)
-      foo: w1(1.0,5), w2(1.0,3), w3(1.0,5)
+      bar: w2(10.0,3)
+      foo: w1(10.0,5), w2(10.0,3), w3(10.0,5)
     Backlogs:
       3: [w2]
       5: [w1, w3]
@@ -131,49 +134,61 @@
     >>> pool.put(w3)
     >>> pool
     Request classes:
-      bar: w2(1.0,4)
-      foo: w1(1.0,2), w2(1.0,4), w3(1.0,3)
+      bar: w2(10.0,4)
+      foo: w1(10.0,2), w2(10.0,4), w3(10.0,3)
     Backlogs:
       2: [w1]
       3: [w3]
       4: [w2]
 
-Now, when we get a worker, we'll get w3.
+Now, when we get a worker, we'll get w1.
 
     >>> pool.get('foo', 0.0)
-    w3
+    w1
 
-Why? We adjust each score by the worker's backlog and search workers
-from high score to low until the adjusted score increases. Because w2
-has a higher backlog than w3, it's adjusted score is lower so we stop
-looking.  This is for 2 reasons:
+Why? We adjust each score by the worker's backlog.
 
-1. We want to bias selection towards a smaller number of workers,
-   ideally those with the best scores,
-
-2. We want to reduce the amount of work done in each get call.
-
 Now that we've done some work, let's updaye the resumes.  This will
 normally be done by workers after periodically collecting performance
 data.
 
-    >>> pool.new_resume(w1, {'foo': 3.0})
+    >>> pool.new_resume(w1, {'foo': 30.0})
 
-    >>> pool.new_resume(w2, {'bar': 1.0, 'foo': 1.0})
+    ;>>> pool.new_resume(w2, {'bar': 10.0, 'foo': 10.0})
 
-    >>> pool.new_resume(w3, {'foo': 2.0})
+    >>> pool.new_resume(w3, {'foo': 19.0})
 
-    pool
+    >>> pool
     Request classes:
-      bar: w2(1.0,4)
-      foo: w2(1.0,4), w3(2.0,4), w1(3.0,2)
+      bar: w2(10.0,4)
+      foo: w2(10.0,4), w3(19.0,3), w1(30.0,3)
     Backlogs:
-      2: [w1]
-      4: [w2, w3]
+      3: [w1, w3]
+      4: [w2]
 
-    >>> [pool.get('foo') for i in range(5)]
-    [w1, w1, w1, w3, w2]
+    >>> pool.get('foo')
+    w1
+    >>> pool.get('foo')
+    w1
 
+    >>> pool
+    Request classes:
+      bar: w2(10.0,4)
+      foo: w2(10.0,4), w3(19.0,3), w1(30.0,5)
+    Backlogs:
+      3: [w3]
+      4: [w2]
+      5: [w1]
+
+Because w1 has reached the maximum backlog, it's out of the running.
+
+    >>> pool.get('foo')
+    w3
+    >>> pool.get('foo')
+    w3
+    >>> pool.get('foo')
+    w2
+
     >>> pool.put(w1)
     >>> pool.put(w3)
     >>> pool.put(w3)
@@ -182,8 +197,8 @@
     >>> pool.put(w3)
     >>> pool
     Request classes:
-      bar: w2(1.0,5)
-      foo: w2(1.0,5), w3(2.0,0), w1(3.0,4)
+      bar: w2(10.0,5)
+      foo: w2(10.0,5), w3(19.0,0), w1(30.0,4)
     Backlogs:
       0: [w3]
       4: [w1]
@@ -195,10 +210,10 @@
 When a worker disconnect, it's removed from the pool:
 
     >>> pool.remove(w1)
+    >>> pool.remove(w3)
     >>> pool
     Request classes:
-      bar: w2(1.0,5)
-      foo: w2(1.0,5), w3(2.0,4)
+      bar: w2(10.0,5)
+      foo: w2(10.0,5)
     Backlogs:
-      4: [w3]
       5: [w2]

Added: Sandbox/J1m/resumelb/src/zc/resumelb/simul.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/simul.py	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/simul.py	2012-01-06 10:41:09 UTC (rev 123965)
@@ -0,0 +1,388 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Simulation
+
+Client makes requests.  It has some number of outstanding requests. It
+accomplishes this through a pool of greenlets.
+
+Have some number of workers.
+
+Each worker app has an lru cache of a given size.
+
+There are some number of "sites".  Each site has a number of objects.
+
+A request "requests" a set of objects for a site.  The worker app
+sleeps .01 for every object not in it's cache.
+
+The whole thing is controlled from a zookeeper node with properties:
+
+- history lb worker history, which controls how many requests to
+  perform between resume updates.
+- sim_cache_size Size of client lru cache
+- sim_clients # concurrent requests
+- sim_lambda exponential distribution parameter for selecting sites (.1)
+- sim_objects_per_site average number of objects per site.
+- sim_objects_per_request
+- sim_sites number of sites
+- sim_workers number of workers
+
+"""
+import json
+import logging
+import os
+import pylru
+import random
+import sys
+import time
+import threading
+import zc.mappingobject
+import zc.parse_addr
+import zc.thread
+import zc.zk
+import zookeeper
+
+logger = logging.getLogger(__name__)
+
+class Sample:
+
+    def __init__(self, size=1000, data=None):
+        self.size = size
+        self.data = data or []
+        self.n = len(self.data)
+
+    def add(self, v):
+        self.n += 1
+        try:
+            self.data[self.n % self.size] = v
+        except IndexError:
+            self.data.append(v)
+
+    def stats(self, prefix=None):
+        data = sorted(self.data)
+        return {
+            'n': self.n,
+            'mean': float(sum(data))/len(data),
+            'min': data[0],
+            '10': data[len(data)/10],
+            '50': data[len(data)/2],
+            '90': data[9*len(data)/10],
+            'max': data[-1],
+            }
+
+    def __repr__(self):
+        return ("%(n)s %(min)s %(10)s %(50)s(%(mean)s) %(90)s %(max)s" %
+                self.stats(''))
+
+class App:
+
+    def __init__(self, properties):
+        settings = zc.mappingobject.mappingobject(properties)
+        self.cache_size = settings.sim_cache_size
+        self.cache = pylru.lrucache(self.cache_size)
+        self.hitrates = Sample()
+
+        @properties
+        def changed(*a):
+            if settings.sim_cache_size != self.cache_size:
+                self.cache_size = settings.sim_cache_size
+                self.cache.size(self.cache_size)
+
+    def __call__(self, environ, start_response):
+        """Simplest possible application object"""
+
+        n = nhit = nmiss = nevict = 0
+        for oid in environ['PATH_INFO'].rsplit('/', 1)[1].split('_'):
+            n += 1
+            key = environ['HTTP_HOST'], oid
+            if key in self.cache:
+                nhit += 1
+            else:
+                nmiss += 1
+                if len(self.cache) >= self.cache_size:
+                    nevict += 1
+                self.cache[key] = 1
+
+                time.sleep(.01)
+
+        result = ' '.join(map(str, (os.getpid(), n, nhit, nmiss, nevict)))+'\n'
+        response_headers = [
+            ('Content-type', 'text/plain'),
+            ('Content-Length', str(len(result))),
+            ]
+        start_response('200 OK', response_headers)
+        if n:
+            self.hitrates.add(100.0*nhit/n)
+        return [result]
+
+def worker(path):
+    import logging
+    logging.basicConfig()
+    logger = logging.getLogger(__name__+'-worker')
+    try:
+        import zc.resumelb.worker
+        import zc.zk
+
+        zk = zc.zk.ZooKeeper()
+        lbpath = path + '/lb'
+        while not (zk.exists(lbpath) and zk.get_children(lbpath)):
+            time.sleep(.01)
+        [lbaddr] = zk.get_children(lbpath)
+
+        properties = zk.properties(path)
+
+        class Worker(zc.resumelb.worker.Worker):
+
+            def new_resume(self, resume):
+                stats = dict(hitrate=str(app.hitrates))
+                stats.update(resume)
+                zk.set(worker_path, json.dumps(stats))
+                zc.resumelb.worker.Worker.new_resume(self, resume)
+
+        worker_path = path + '/workers/%s' % os.getpid()
+
+        zk.create(worker_path, '',
+                  zc.zk.OPEN_ACL_UNSAFE, zookeeper.EPHEMERAL)
+
+        app = App(properties)
+        Worker(app, zc.parse_addr.parse_addr(lbaddr), properties)
+    except:
+        logger.exception('worker')
+
+def clients(path):
+    import zc.zk
+    zk = zc.zk.ZooKeeper()
+
+    properties = zk.properties(path)
+    settings = zc.mappingobject.mappingobject(properties)
+
+    logging.basicConfig()
+
+    wpath = path + '/wsgi'
+    while not (zk.exists(wpath) and zk.get_children(wpath)):
+        time.sleep(.01)
+    [waddr] = zk.get_children(wpath)
+    waddr = zc.parse_addr.parse_addr(waddr)
+
+    stats = zc.mappingobject.mappingobject(dict(
+        sim_truncated = 0,
+        sim_requests = 0,
+        sim_bypid = {},
+        sim_nobs = 0,
+        sim_nhits = 0,
+        ))
+
+    spath = path + '/stats'
+    if not zk.exists(spath):
+        zk.create(spath, '', zc.zk.OPEN_ACL_UNSAFE)
+
+    import gevent.socket
+
+    def do_request():
+        siteid = random.randint(0, settings.sim_sites)
+        oids = set(
+            int(random.gauss(0, settings.sim_objects_per_site/4))
+            for i in range(settings.sim_objects_per_request)
+            )
+        socket = gevent.socket.create_connection(waddr)
+        try:
+            socket.sendall(
+                request_template % dict(
+                    data='_'.join(map(str, oids)),
+                    host='h%s' % siteid,
+                    )
+                )
+            response = ''
+            while '\r\n\r\n' not in response:
+                data = socket.recv(9999)
+                if not data:
+                    stats.sim_truncated += 1
+                    return
+                response += data
+            headers, body = response.split('\r\n\r\n')
+            headers = headers.split('\r\n')
+            status = headers.pop(0)
+            headers = dict(l.strip().lower().split(':', 1)
+                           for l in headers if ':' in l)
+            content_length = int(headers['content-length'])
+            while len(body) < content_length:
+                data = socket.recv(9999)
+                if not data:
+                    stats.sim_truncated += 1
+                    return
+                body += data
+
+            pid, n, nhit, nmiss, nevict = map(int, body.strip().split())
+            stats.sim_requests += 1
+            stats.sim_nobs += n
+            stats.sim_nhits += nhit
+            bypid = stats.sim_bypid.get(pid)
+            if bypid is None:
+                bypid = stats.sim_bypid[pid] = dict(nr=0, n=0, nhit=0)
+            bypid['nr'] += 1
+            bypid['n'] += n
+            bypid['nhit'] += nhit
+            logger.info(' '.join(map(str, (
+                100*stats.sim_nhits/stats.sim_nobs,
+                pid, n, nhit, 100*nhit/n,
+                ))))
+        finally:
+            socket.close()
+
+    def client():
+        try:
+            while 1:
+                do_request()
+        except:
+            print 'client error'
+            logging.getLogger(__name__+'-client').exception('client')
+
+    greenlets = [gevent.spawn(client) for i in range(settings.sim_clients)]
+
+    import gevent.queue, zc.resumelb.thread
+    update_queue = gevent.queue.Queue()
+
+    @properties
+    def update(*a):
+        print 'put update'
+        update_queue.put(None)
+        zc.resumelb.thread.wake_gevent()
+
+    while 1:
+        update_queue.get()
+        print 'got update event'
+        while settings.sim_clients > len(greenlets):
+            greenlets.append(gevent.spawn(client))
+        while settings.sim_clients < len(greenlets):
+            greenlets.pop().kill()
+
+request_template = """GET /%(data)s HTTP/1.1\r
+Host: %(host)s\r
+\r
+"""
+
+class LBLogger:
+
+    def __init__(self, lb, zk, path):
+        self.lb = lb
+        self.requests = Sample()
+        self.nr = self.requests.n
+        self.zk = zk
+        self.path = path
+        self.then = time.time()
+
+    def write(self, line):
+        status, _, t = line.split()[-3:]
+        if status != '200':
+            print 'error', line
+        self.requests.add(float(t))
+        if ((time.time() - self.then > 30)
+            #or self.nr < 30
+            ):
+            pool = self.lb.pool
+            self.then = time.time()
+            print
+            print 'requests', self.requests.n-self.nr, self.requests
+            self.nr = self.requests.n
+            print pool.unskilled
+            print 'backlogs', str(Sample(data=[
+                worker.backlog for worker in pool.workers]))
+            print 'resumes', str(Sample(data=[
+                len(worker.resume) for worker in pool.workers]))
+            print 'skilled', str(Sample(
+                data=map(len, pool.skilled.values())))
+
+            for rclass, skilled in sorted(pool.skilled.items()):
+                if (len(skilled) > len(pool.workers) or
+                    len(set(i[1] for i in skilled)) != len(skilled)
+                    ):
+                    print 'bad skilled', sorted(skilled, key=lambda i: i[1])
+
+
+
+
+
+
+
+            # print 'backlogs', str(Sample(data=self.lb.pool.backlogs.values()))
+            # print 'resumes', str(Sample(
+            #     data=map(len, self.lb.pool.resumes.values())))
+            # print 'skilled', str(Sample(
+            #         data=map(len, self.lb.pool.skilled.values())))
+            # self.zk.set(self.path, json.dumps(dict(
+            #     requests = str(self.requests),
+            #     backlogs = str(Sample(data=self.lb.pool.backlogs.values())),
+            #     resumes = str(Sample(
+            #         data=map(len, self.lb.pool.resumes.values()))),
+            #     skilled = str(Sample(
+            #         data=map(len, self.lb.pool.skilled.values()))),
+            #     )))
+
+def main(args=None):
+    if args is None:
+        args = sys.argv[1:]
+    [path] = args
+    logging.basicConfig()
+
+    @zc.thread.Process(args=(path,))
+    def lb(path):
+        import logging
+        logging.basicConfig()
+        logger = logging.getLogger(__name__+'-lb')
+        try:
+            import zc.resumelb.lb
+            import gevent.pywsgi
+            zk = zc.zk.ZooKeeper()
+            lb = zc.resumelb.lb.LB(
+                ('127.0.0.1', 0), zc.resumelb.lb.host_classifier,
+                settings=zk.properties(path))
+            lbpath = path + '/lb'
+            if not zk.exists(lbpath):
+                zk.create(lbpath, '', zc.zk.OPEN_ACL_UNSAFE)
+            zk.register_server(
+                lbpath, ('127.0.0.1', lb.worker_server.server_port))
+
+            wsgi_server = gevent.pywsgi.WSGIServer(
+                ('127.0.0.1', 0), lb.handle_wsgi, log=LBLogger(lb, zk, lbpath),
+                )
+            wsgi_server.start()
+            wpath = path + '/wsgi'
+            if not zk.exists(wpath):
+                zk.create(wpath, '', zc.zk.OPEN_ACL_UNSAFE)
+            zk.register_server(wpath, ('127.0.0.1', wsgi_server.server_port))
+            wsgi_server.serve_forever()
+        except:
+            logger.exception('lb')
+
+    zk = zc.zk.ZooKeeper()
+
+    workers_path = path + '/workers'
+    if not zk.exists(workers_path):
+        zk.create(workers_path, '', zc.zk.OPEN_ACL_UNSAFE)
+
+    properties = zk.properties(path)
+    settings = zc.mappingobject.mappingobject(properties)
+
+    workers = [zc.thread.Process(worker, args=(path,))
+               for i in range(settings.sim_workers)]
+
+    clients_process = zc.thread.Process(clients, args=(path,))
+
+    @properties
+    def update(*a):
+        while settings.sim_workers > len(workers):
+            workers.append(zc.thread.Process(worker, args=(path,)))
+        while settings.sim_workers < len(workers):
+            workers.pop().terminate()
+
+    threading.Event().wait() # sleep forever


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/simul.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/thread.py	2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/thread.py	2012-01-06 10:41:09 UTC (rev 123965)
@@ -41,20 +41,23 @@
     except EnvironmentError:
         pass
 
-gevent.core.event(gevent.core.EV_READ|gevent.core.EV_PERSIST, \
-    _core_pipe_read, _core_pipe_read_callback).add()
+gevent.core.event(gevent.core.EV_READ|gevent.core.EV_PERSIST,
+                  _core_pipe_read, _core_pipe_read_callback).add()
 
+def wake_gevent():
+    os.write(_core_pipe_write, '\0')
+
 # MTAsyncResult is greatly simplified from version in https://bitbucket.org/
 #   denis/gevent-playground/src/49d1cdcdf643/geventutil/threadpool.py
 class MTAsyncResult(gevent.event.AsyncResult):
 
     def set_exception(self, exception):
         gevent.event.AsyncResult.set_exception(self, exception)
-        os.write(_core_pipe_write, '\0')
+        wake_gevent()
 
     def set(self, value=None):
         gevent.event.AsyncResult.set(self, value)
-        os.write(_core_pipe_write, '\0')
+        wake_gevent()
 
 #
 ###############################################################################

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-01-06 10:41:09 UTC (rev 123965)
@@ -98,15 +98,19 @@
                         sumn[1] += 1
                     else:
                         byrclass[rclass] = [elapsed, 1]
-                self.resume = dict(
+                self.new_resume(dict(
                     (rclass, n/sum)
                     for (rclass, (sum, n)) in byrclass.iteritems()
-                    )
-                self.put((0, self.resume))
+                    ))
 
         except self.Disconnected:
             return # whatever
 
+    def new_resume(self, resume):
+        self.resume = resume
+        self.put((0, resume))
+
+
 def server_runner(app, global_conf, lb, history=500): # paste deploy hook
     logging.basicConfig(level=logging.INFO)
     host, port = lb.split(':')



More information about the checkins mailing list