[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/ Refactored the way settings are updated. Now have an update_settings

Jim Fulton jim at zope.com
Sun Feb 26 16:56:12 UTC 2012


Log message for revision 124477:
  Refactored the way settings are updated.  Now have an update_settings
  api in lbs and workers and have zk handlers call those, rather than
  passing mutable settings objects to constructors.  This allows
  settings to be handled as simple object attrs, simplifying their use.
  

Changed:
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/zk.test

-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-02-26 16:56:11 UTC (rev 124477)
@@ -7,7 +7,6 @@
 import logging
 import sys
 import webob
-import zc.mappingobject
 import zc.resumelb.util
 
 block_size = 1<<16
@@ -26,12 +25,12 @@
 class LB:
 
     def __init__(self, worker_addrs, classifier,
-                 settings=None,
                  disconnect_message=default_disconnect_message,
-                 ):
+                 **pool_settings):
         self.classifier = classifier
         self.disconnect_message = disconnect_message
-        self.pool = Pool(settings)
+        self.pool = Pool(**pool_settings)
+        self.update_settings = self.pool.update_settings
         self.workletts = {}
         self.set_worker_addrs(worker_addrs)
 
@@ -88,15 +87,24 @@
 
 class Pool:
 
-    def __init__(self, settings=None):
-        if settings is None:
-            settings = {}
-        self.settings = settings
+    def __init__(self, redundancy=1, max_backlog=40, min_score=1.0,
+                 unskilled_score=None):
+        self.redundancy = redundancy
+        self.max_backlog = max_backlog
+        self.min_score = min_score
+        self.unskilled_score = unskilled_score
+
         self.workers = set()
         self.unskilled = llist.dllist()
         self.skilled = {}   # rclass -> {(score, workers)}
         self.event = gevent.event.Event()
 
+    def update_settings(self, settings):
+        for name in ('redundancy', 'max_backlog',
+                     'min_score', 'unskilled_score'):
+            if name in settings:
+                setattr(self, name, settings[name])
+
     def __repr__(self):
         outl = []
         out = outl.append
@@ -123,9 +131,8 @@
         unskilled = self.unskilled
         workers = self.workers
 
-        target_skills_per_worker = 1 + (
-            self.settings.get('redundancy', 1) * len(skilled) /
-            (len(workers) or 1))
+        target_skills_per_worker = (
+            1 + self.redundancy * len(skilled) / (len(workers) or 1))
 
         if worker in workers:
             for rclass, score in worker.resume.iteritems():
@@ -169,8 +176,8 @@
                 return None
 
         # Look for a skilled worker
-        max_backlog = self.settings.get('max_backlog', 40)
-        min_score = self.settings.get('min_score', 1.0)
+        max_backlog = self.max_backlog
+        min_score = self.min_score
         best_score = 0
         best_worker = None
         skilled = self.skilled.get(rclass)
@@ -227,7 +234,7 @@
                 # - The score will be higher than some the existing, so it'll
                 #   get work
                 # We also allow for an unskilled_score setting to override.
-                score = self.settings.get('unskilled_score', min_score)
+                score = self.unskilled_score or self.min_score
                 best_worker.resume[rclass] = score
                 skilled.add((score, best_worker))
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test	2012-02-26 16:56:11 UTC (rev 124477)
@@ -17,8 +17,7 @@
 to be managed in real time.
 
     >>> import zc.resumelb.lb
-    >>> settings = {}
-    >>> pool = zc.resumelb.lb.Pool(settings)
+    >>> pool = zc.resumelb.lb.Pool()
 
 The get method is used to get a worker from the pool.  A request class
 and an optional timeout is passed. (The timeout is mainly useful for
@@ -109,7 +108,7 @@
 backlog.  Also, it's score is greater than the min score, which
 defaults to 1.0.  Let's reduce the maximum backlog to 5:
 
-    >>> settings['max_backlog'] = 5
+    >>> pool.max_backlog = 5
 
 So now, w1 has reached it's maximum backlog.  If
 we make another foo request, we'll start using w3, and when that's
@@ -291,7 +290,7 @@
 Now we have 7 request classes and 4 workers.  If we set redundancy to
 3, then the expected resume size is 6, so::
 
-    >>> settings['redundancy'] = 3
+    >>> pool.redundancy = 3
     >>> pool.new_resume(w4, dict((str(i), float(i)) for i in range(9)))
 
     >>> pool
@@ -328,7 +327,7 @@
 
 but set the min_score to 4:
 
-    >>> settings['min_score'] = 4.0
+    >>> pool.min_score = 4.0
 
 And the get a worker for foo:
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-02-26 16:56:11 UTC (rev 124477)
@@ -10,7 +10,6 @@
 import os
 import sys
 import time
-import zc.mappingobject
 import zc.resumelb.util
 
 logger = logging.getLogger(__name__)
@@ -25,10 +24,10 @@
 
 class Worker:
 
-    def __init__(self, app, addr, settings,
+    def __init__(self, app, addr, history=999,
                  resume_file=None, threads=None, tracelog=None):
         self.app = app
-        self.settings = zc.mappingobject.mappingobject(settings)
+        self.history = history
         self.worker_request_number = 0
         self.resume_file = resume_file
         self.resume = {}
@@ -107,6 +106,10 @@
         self.server.start()
         self.addr = addr[0], self.server.server_port
 
+    def update_settings(self, data):
+        if 'history' in data:
+            self.history = data['history']
+
     def stop(self):
         self.server.stop()
         if hasattr(self, 'threadpool'):
@@ -181,7 +184,7 @@
 
                 elapsed = max(time.time() - env['zc.resumelb.time'], 1e-9)
                 time_ring = self.time_ring
-                time_ring_pos = rno % self.settings.history
+                time_ring_pos = rno % self.history
                 rclass = env['zc.resumelb.request_class']
                 try:
                     time_ring[time_ring_pos] = rclass, elapsed
@@ -191,7 +194,7 @@
 
                 worker_request_number = self.worker_request_number + 1
                 self.worker_request_number = worker_request_number
-                if worker_request_number % self.settings.history == 0:
+                if worker_request_number % self.history == 0:
                     byrclass = {}
                     for rclass, elapsed in time_ring:
                         sumn = byrclass.get(rclass)

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.test	2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.test	2012-02-26 16:56:11 UTC (rev 124477)
@@ -8,7 +8,7 @@
 
     >>> import zc.resumelb.worker, zc.resumelb.tests
     >>> worker = zc.resumelb.worker.Worker(
-    ...   zc.resumelb.tests.app(), ('127.0.0.1', 0), dict(history=5))
+    ...   zc.resumelb.tests.app(), ('127.0.0.1', 0), history=5)
 
 Here we created a worker using a test application, telling it to
 an address to listen on and to update it's resume after every
@@ -375,7 +375,7 @@
     ...     marshal.dump(dict(a=1.0, b=2.0), f)
 
     >>> worker = zc.resumelb.worker.Worker(
-    ...   zc.resumelb.tests.app(), ('127.0.0.1', 0), dict(history=2),
+    ...   zc.resumelb.tests.app(), ('127.0.0.1', 0), history=2,
     ...   resume_file='resume.mar')
 
     >>> from pprint import pprint
@@ -502,7 +502,7 @@
     >>> with mock.patch('datetime.datetime') as dtmock:
     ...     dtmock.now.side_effect = lambda : now
     ...     worker = zc.resumelb.worker.Worker(
-    ...       zc.resumelb.tests.app(), ('127.0.0.1', 0), dict(history=2),
+    ...       zc.resumelb.tests.app(), ('127.0.0.1', 0), history=2,
     ...       tracelog='tracelog', threads=1)
 
     >>> worker_socket = gevent.socket.create_connection(worker.addr)

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.py	2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.py	2012-02-26 16:56:11 UTC (rev 124477)
@@ -32,11 +32,20 @@
     zk = zc.zk.ZooKeeper(zookeeper)
     address = zc.parse_addr.parse_addr(address)
     from zc.resumelb.worker import Worker
-    worker = Worker(app, address, zk.properties(path),
-                    threads=threads and int(threads),
+
+    worker = Worker(app, address, threads=threads and int(threads),
                     **kw)
+
+    # Set up notification of settings changes.
+    settings = zk.properties(path)
+    watcher = gevent.get_hub().loop.async()
+    watcher.start(lambda : worker.update_settings(settings))
+    settings(lambda _: watcher.send())
+
     zk.register_server(path+'/providers', worker.addr)
     worker.zk = zk
+    worker.__zksettings = settings
+
     if run:
         try:
             worker.server.serve_forever()
@@ -110,7 +119,6 @@
 
 
     zk = zc.zk.ZooKeeper(zookeeper)
-    settings = zk.properties(path)
     addrs = zk.children(path+'/workers/providers')
     rcmod, rcexpr = options.request_classifier.split(':')
     __import__(rcmod)
@@ -126,16 +134,24 @@
 
     from zc.resumelb.lb import LB
     lb = LB(map(zc.parse_addr.parse_addr, addrs),
-            request_classifier, settings, disconnect_message)
-    lb.zk = zk
+            request_classifier, disconnect_message)
 
     # Set up notification of address changes.
-    watcher = gevent.get_hub().loop.async()
-    @watcher.start
+    awatcher = gevent.get_hub().loop.async()
+    @awatcher.start
     def _():
         lb.set_worker_addrs(map(zc.parse_addr.parse_addr, addrs))
-    addrs(lambda a: watcher.send())
+    addrs(lambda a: awatcher.send())
 
+    # Set up notification of address changes.
+    settings = zk.properties(path)
+    swatcher = gevent.get_hub().loop.async()
+    swatcher.start(lambda : lb.update_settings(settings))
+    settings(lambda a: swatcher.send())
+
+    lb.zk = zk
+    lb.__zk = addrs, settings
+
     # Now, start a wsgi server
     addr = zc.parse_addr.parse_addr(options.address)
     if options.max_connections:

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.test	2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.test	2012-02-26 16:56:11 UTC (rev 124477)
@@ -72,7 +72,7 @@
 
 The worker got it's settings from the tree:
 
-    >>> worker.settings.history
+    >>> worker.history
     999
 
 It loaded it's resume from resume.mar:



More information about the checkins mailing list