[Checkins] SVN: Sandbox/J1m/resumelb/s Updated simulation script to work with recent changes.

Jim Fulton jim at zope.com
Tue Jan 31 12:50:13 UTC 2012


Log message for revision 124268:
  Updated simulation script to work with recent changes.
  

Changed:
  A   Sandbox/J1m/resumelb/simul_setup.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/simul.py

-=-
Added: Sandbox/J1m/resumelb/simul_setup.py
===================================================================
--- Sandbox/J1m/resumelb/simul_setup.py	                        (rev 0)
+++ Sandbox/J1m/resumelb/simul_setup.py	2012-01-31 12:50:12 UTC (rev 124268)
@@ -0,0 +1,20 @@
+import zc.zk
+
+zk = zc.zk.ZooKeeper()
+
+zk.import_tree("""
+/simul
+  cache_size=20000
+  clients=12
+  lambda=0.01
+  objects_per_site=1000
+  objects_per_request=100
+  sites=40
+  workers=2
+  /lb
+    /providers
+    /workers
+      history=999
+      threads=1
+      /providers
+""", trim=True)


Property changes on: Sandbox/J1m/resumelb/simul_setup.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/simul.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/simul.py	2012-01-31 12:50:09 UTC (rev 124267)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/simul.py	2012-01-31 12:50:12 UTC (rev 124268)
@@ -29,13 +29,13 @@
 
 - history lb worker history, which controls how many requests to
   perform between resume updates.
-- sim_cache_size Size of client lru cache
-- sim_clients # concurrent requests
-- sim_lambda exponential distribution parameter for selecting sites (.1)
-- sim_objects_per_site average number of objects per site.
-- sim_objects_per_request
-- sim_sites number of sites
-- sim_workers number of workers
+- cache_size Size of client lru cache
+- clients # concurrent requests
+- lambda exponential distribution parameter for selecting sites (.1)
+- objects_per_site average number of objects per site.
+- objects_per_request
+- sites number of sites
+- workers number of workers
 
 """
 from pprint import pprint
@@ -52,6 +52,7 @@
 import zc.thread
 import zc.zk
 import zookeeper
+import zope.testing.wait
 
 logger = logging.getLogger(__name__)
 
@@ -89,14 +90,14 @@
 
     def __init__(self, properties):
         settings = zc.mappingobject.mappingobject(properties)
-        self.cache_size = settings.sim_cache_size
+        self.cache_size = settings.cache_size
         self.cache = pylru.lrucache(self.cache_size)
         self.hitrates = Sample()
 
         @properties
         def changed(*a):
-            if settings.sim_cache_size != self.cache_size:
-                self.cache_size = settings.sim_cache_size
+            if settings.cache_size != self.cache_size:
+                self.cache_size = settings.cache_size
                 self.cache.size(self.cache_size)
 
     def __call__(self, environ, start_response):
@@ -122,6 +123,9 @@
         start_response('200 OK', response_headers)
         if n:
             self.hitrates.add(100.0*nhit/n)
+            if self.hitrates.n % 1000 == 0:
+                print os.getpid(), 'hitrate', self.hitrates
+
         return [result]
 
 def worker(path):
@@ -129,38 +133,27 @@
     logging.basicConfig()
     logger = logging.getLogger(__name__+'-worker')
     try:
-        import zc.resumelb.worker
-        import zc.zk
-
+        import zc.resumelb.zk
         zk = zc.zk.ZooKeeper()
-        lbpath = path + '/lb'
-        while not (zk.exists(lbpath) and zk.get_children(lbpath)):
-            time.sleep(.01)
-        [lbaddr] = zk.get_children(lbpath)
-
         properties = zk.properties(path)
-
-        class Worker(zc.resumelb.worker.Worker):
-
-            def new_resume(self, resume):
-                print '\nNEW RESUME:', len(resume), os.getpid(), time.ctime()
-                pprint(resume)
-                stats = dict(hitrate=str(app.hitrates))
-                stats.update(resume)
-                zk.set(worker_path, json.dumps(stats))
-                zc.resumelb.worker.Worker.new_resume(self, resume)
-
-        worker_path = path + '/workers/%s' % os.getpid()
-
-        zk.create(worker_path, '',
-                  zc.zk.OPEN_ACL_UNSAFE, zookeeper.EPHEMERAL)
-
         app = App(properties)
-        Worker(app, zc.parse_addr.parse_addr(lbaddr), properties)
+        resume_file = 'resume%s.mar' % os.getpid()
+        if os.path.exists(resume_file):
+            os.remove(resume_file)
+        zc.resumelb.zk.worker(
+            app, {},
+            zookeeper='127.0.0.1:2181',
+            path=path+'/lb/workers',
+            address='127.0.0.1:0',
+            resume_file=resume_file,
+            )
     except:
         logger.exception('worker')
 
 def clients(path):
+    logging.basicConfig()
+    random.seed(0)
+
     import zc.zk
     zk = zc.zk.ZooKeeper()
 
@@ -171,27 +164,25 @@
 
     @properties
     def _(*a):
-        n = settings.sim_sites
+        n = settings.sites
         siteids[:] = [0]
         for i in range(4):
             if n:
                 siteids.extend(range(n))
             n /= 2
 
-    logging.basicConfig()
+    wpath = path + '/lb/providers'
+    zope.testing.wait.wait(lambda : zk.get_children(wpath))
 
-    wpath = path + '/wsgi'
-    while not (zk.exists(wpath) and zk.get_children(wpath)):
-        time.sleep(.01)
     [waddr] = zk.get_children(wpath)
     waddr = zc.parse_addr.parse_addr(waddr)
 
     stats = zc.mappingobject.mappingobject(dict(
-        sim_truncated = 0,
-        sim_requests = 0,
-        sim_bypid = {},
-        sim_nobs = 0,
-        sim_nhits = 0,
+        truncated = 0,
+        requests = 0,
+        bypid = {},
+        nobs = 0,
+        nhits = 0,
         ))
 
     spath = path + '/stats'
@@ -203,8 +194,8 @@
     def do_request():
         siteid = random.choice(siteids)
         oids = set(
-            int(random.gauss(0, settings.sim_objects_per_site/4))
-            for i in range(settings.sim_objects_per_request)
+            int(random.gauss(0, settings.objects_per_site/4))
+            for i in range(settings.objects_per_request)
             )
         socket = gevent.socket.create_connection(waddr)
         try:
@@ -218,7 +209,7 @@
             while '\r\n\r\n' not in response:
                 data = socket.recv(9999)
                 if not data:
-                    stats.sim_truncated += 1
+                    stats.truncated += 1
                     return
                 response += data
             headers, body = response.split('\r\n\r\n')
@@ -230,22 +221,22 @@
             while len(body) < content_length:
                 data = socket.recv(9999)
                 if not data:
-                    stats.sim_truncated += 1
+                    stats.truncated += 1
                     return
                 body += data
 
             pid, n, nhit, nmiss, nevict = map(int, body.strip().split())
-            stats.sim_requests += 1
-            stats.sim_nobs += n
-            stats.sim_nhits += nhit
-            bypid = stats.sim_bypid.get(pid)
+            stats.requests += 1
+            stats.nobs += n
+            stats.nhits += nhit
+            bypid = stats.bypid.get(pid)
             if bypid is None:
-                bypid = stats.sim_bypid[pid] = dict(nr=0, n=0, nhit=0)
+                bypid = stats.bypid[pid] = dict(nr=0, n=0, nhit=0)
             bypid['nr'] += 1
             bypid['n'] += n
             bypid['nhit'] += nhit
             logger.info(' '.join(map(str, (
-                100*stats.sim_nhits/stats.sim_nobs,
+                100*stats.nhits/stats.nobs,
                 pid, n, nhit, 100*nhit/n,
                 ))))
         finally:
@@ -259,25 +250,24 @@
             print 'client error'
             logging.getLogger(__name__+'-client').exception('client')
 
-    greenlets = [gevent.spawn(client) for i in range(settings.sim_clients)]
+    greenlets = [gevent.spawn(client) for i in range(settings.clients)]
 
-    import gevent.queue, zc.resumelb.thread
-    update_queue = gevent.queue.Queue()
 
-    @properties
-    def update(*a):
-        print 'put update'
-        update_queue.put(None)
-        zc.resumelb.thread.wake_gevent()
-
-    while 1:
-        update_queue.get()
+    # Set up notification of address changes.
+    watcher = gevent.get_hub().loop.async()
+    @watcher.start
+    def _():
         print 'got update event'
-        while settings.sim_clients > len(greenlets):
+        while settings.clients > len(greenlets):
             greenlets.append(gevent.spawn(client))
-        while settings.sim_clients < len(greenlets):
+        while settings.clients < len(greenlets):
             greenlets.pop().kill()
 
+    properties(lambda a: watcher.send())
+
+    while 1:
+        gevent.sleep(60.0)
+
 request_template = """GET /%(data)s HTTP/1.1\r
 Host: %(host)s\r
 \r
@@ -285,12 +275,9 @@
 
 class LBLogger:
 
-    def __init__(self, lb, zk, path):
-        self.lb = lb
+    def __init__(self):
         self.requests = Sample()
         self.nr = self.requests.n
-        self.zk = zk
-        self.path = path
         self.then = time.time()
 
     def write(self, line):
@@ -328,57 +315,36 @@
     [path] = args
     logging.basicConfig()
 
-    random.seed(0)
+    zk = zc.zk.ZooKeeper()
+    properties = zk.properties(path)
+    settings = zc.mappingobject.mappingobject(properties)
 
+    workers = [zc.thread.Process(worker, args=(path,))
+               for i in range(settings.workers)]
+
     @zc.thread.Process(args=(path,))
     def lb(path):
         import logging
         logging.basicConfig()
         logger = logging.getLogger(__name__+'-lb')
         try:
-            import zc.resumelb.lb
-            import gevent.pywsgi
-            zk = zc.zk.ZooKeeper()
-            lb = zc.resumelb.lb.LB(
-                ('127.0.0.1', 0), zc.resumelb.lb.host_classifier,
-                settings=zk.properties(path))
-            lbpath = path + '/lb'
-            if not zk.exists(lbpath):
-                zk.create(lbpath, '', zc.zk.OPEN_ACL_UNSAFE)
-            zk.register_server(
-                lbpath, ('127.0.0.1', lb.worker_server.server_port))
-
-            wsgi_server = gevent.pywsgi.WSGIServer(
-                ('127.0.0.1', 0), lb.handle_wsgi, log=LBLogger(lb, zk, lbpath),
-                )
-            wsgi_server.start()
-            wpath = path + '/wsgi'
-            if not zk.exists(wpath):
-                zk.create(wpath, '', zc.zk.OPEN_ACL_UNSAFE)
-            zk.register_server(wpath, ('127.0.0.1', wsgi_server.server_port))
-            wsgi_server.serve_forever()
+            import zc.resumelb.zk
+            lb, server, logger = zc.resumelb.zk.lbmain(
+                ['-a127.0.0.1:0', '-l', LBLogger(),
+                 '127.0.0.1:2181', '/simul/lb'],
+                run=False)
+            logger.lb = lb
+            server.serve_forever()
         except:
             logger.exception('lb')
 
-    zk = zc.zk.ZooKeeper()
-
-    workers_path = path + '/workers'
-    if not zk.exists(workers_path):
-        zk.create(workers_path, '', zc.zk.OPEN_ACL_UNSAFE)
-
-    properties = zk.properties(path)
-    settings = zc.mappingobject.mappingobject(properties)
-
-    workers = [zc.thread.Process(worker, args=(path,))
-               for i in range(settings.sim_workers)]
-
     clients_process = zc.thread.Process(clients, args=(path,))
 
     @properties
     def update(*a):
-        while settings.sim_workers > len(workers):
+        while settings.workers > len(workers):
             workers.append(zc.thread.Process(worker, args=(path,)))
-        while settings.sim_workers < len(workers):
+        while settings.workers < len(workers):
             workers.pop().terminate()
 
     threading.Event().wait() # sleep forever



More information about the checkins mailing list