[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/ Refactored the way settings are updated. Now have an update_settings
Jim Fulton
jim at zope.com
Sun Feb 26 16:56:12 UTC 2012
Log message for revision 124477:
Refactored the way settings are updated. Now have an update_settings
api in lbs and workers and have zk handlers call those, rather than
passing mutable settings objects to constructors. This allows
settings to be handled as simple object attrs, simplifying their use.
Changed:
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
U Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
U Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
U Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
U Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
U Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2012-02-26 16:56:11 UTC (rev 124477)
@@ -7,7 +7,6 @@
import logging
import sys
import webob
-import zc.mappingobject
import zc.resumelb.util
block_size = 1<<16
@@ -26,12 +25,12 @@
class LB:
def __init__(self, worker_addrs, classifier,
- settings=None,
disconnect_message=default_disconnect_message,
- ):
+ **pool_settings):
self.classifier = classifier
self.disconnect_message = disconnect_message
- self.pool = Pool(settings)
+ self.pool = Pool(**pool_settings)
+ self.update_settings = self.pool.update_settings
self.workletts = {}
self.set_worker_addrs(worker_addrs)
@@ -88,15 +87,24 @@
class Pool:
- def __init__(self, settings=None):
- if settings is None:
- settings = {}
- self.settings = settings
+ def __init__(self, redundancy=1, max_backlog=40, min_score=1.0,
+ unskilled_score=None):
+ self.redundancy = redundancy
+ self.max_backlog = max_backlog
+ self.min_score = min_score
+ self.unskilled_score = unskilled_score
+
self.workers = set()
self.unskilled = llist.dllist()
self.skilled = {} # rclass -> {(score, workers)}
self.event = gevent.event.Event()
+ def update_settings(self, settings):
+ for name in ('redundancy', 'max_backlog',
+ 'min_score', 'unskilled_score'):
+ if name in settings:
+ setattr(self, name, settings[name])
+
def __repr__(self):
outl = []
out = outl.append
@@ -123,9 +131,8 @@
unskilled = self.unskilled
workers = self.workers
- target_skills_per_worker = 1 + (
- self.settings.get('redundancy', 1) * len(skilled) /
- (len(workers) or 1))
+ target_skills_per_worker = (
+ 1 + self.redundancy * len(skilled) / (len(workers) or 1))
if worker in workers:
for rclass, score in worker.resume.iteritems():
@@ -169,8 +176,8 @@
return None
# Look for a skilled worker
- max_backlog = self.settings.get('max_backlog', 40)
- min_score = self.settings.get('min_score', 1.0)
+ max_backlog = self.max_backlog
+ min_score = self.min_score
best_score = 0
best_worker = None
skilled = self.skilled.get(rclass)
@@ -227,7 +234,7 @@
# - The score will be higher than some the existing, so it'll
# get work
# We also allow for an unskilled_score setting to override.
- score = self.settings.get('unskilled_score', min_score)
+ score = self.unskilled_score or self.min_score
best_worker.resume[rclass] = score
skilled.add((score, best_worker))
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test 2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test 2012-02-26 16:56:11 UTC (rev 124477)
@@ -17,8 +17,7 @@
to be managed in real time.
>>> import zc.resumelb.lb
- >>> settings = {}
- >>> pool = zc.resumelb.lb.Pool(settings)
+ >>> pool = zc.resumelb.lb.Pool()
The get method is used to get a worker from the pool. A request class
and an optional timeout is passed. (The timeout is mainly useful for
@@ -109,7 +108,7 @@
backlog. Also, it's score is greater than the min score, which
defaults to 1.0. Let's reduce the maximum backlog to 5:
- >>> settings['max_backlog'] = 5
+ >>> pool.max_backlog = 5
So now, w1 has reached it's maximum backlog. If
we make another foo request, we'll start using w3, and when that's
@@ -291,7 +290,7 @@
Now we have 7 request classes and 4 workers. If we set redundancy to
3, then the expected resume size is 6, so::
- >>> settings['redundancy'] = 3
+ >>> pool.redundancy = 3
>>> pool.new_resume(w4, dict((str(i), float(i)) for i in range(9)))
>>> pool
@@ -328,7 +327,7 @@
but set the min_score to 4:
- >>> settings['min_score'] = 4.0
+ >>> pool.min_score = 4.0
And the get a worker for foo:
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2012-02-26 16:56:11 UTC (rev 124477)
@@ -10,7 +10,6 @@
import os
import sys
import time
-import zc.mappingobject
import zc.resumelb.util
logger = logging.getLogger(__name__)
@@ -25,10 +24,10 @@
class Worker:
- def __init__(self, app, addr, settings,
+ def __init__(self, app, addr, history=999,
resume_file=None, threads=None, tracelog=None):
self.app = app
- self.settings = zc.mappingobject.mappingobject(settings)
+ self.history = history
self.worker_request_number = 0
self.resume_file = resume_file
self.resume = {}
@@ -107,6 +106,10 @@
self.server.start()
self.addr = addr[0], self.server.server_port
+ def update_settings(self, data):
+ if 'history' in data:
+ self.history = data['history']
+
def stop(self):
self.server.stop()
if hasattr(self, 'threadpool'):
@@ -181,7 +184,7 @@
elapsed = max(time.time() - env['zc.resumelb.time'], 1e-9)
time_ring = self.time_ring
- time_ring_pos = rno % self.settings.history
+ time_ring_pos = rno % self.history
rclass = env['zc.resumelb.request_class']
try:
time_ring[time_ring_pos] = rclass, elapsed
@@ -191,7 +194,7 @@
worker_request_number = self.worker_request_number + 1
self.worker_request_number = worker_request_number
- if worker_request_number % self.settings.history == 0:
+ if worker_request_number % self.history == 0:
byrclass = {}
for rclass, elapsed in time_ring:
sumn = byrclass.get(rclass)
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.test 2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.test 2012-02-26 16:56:11 UTC (rev 124477)
@@ -8,7 +8,7 @@
>>> import zc.resumelb.worker, zc.resumelb.tests
>>> worker = zc.resumelb.worker.Worker(
- ... zc.resumelb.tests.app(), ('127.0.0.1', 0), dict(history=5))
+ ... zc.resumelb.tests.app(), ('127.0.0.1', 0), history=5)
Here we created a worker using a test application, telling it to
an address to listen on and to update it's resume after every
@@ -375,7 +375,7 @@
... marshal.dump(dict(a=1.0, b=2.0), f)
>>> worker = zc.resumelb.worker.Worker(
- ... zc.resumelb.tests.app(), ('127.0.0.1', 0), dict(history=2),
+ ... zc.resumelb.tests.app(), ('127.0.0.1', 0), history=2,
... resume_file='resume.mar')
>>> from pprint import pprint
@@ -502,7 +502,7 @@
>>> with mock.patch('datetime.datetime') as dtmock:
... dtmock.now.side_effect = lambda : now
... worker = zc.resumelb.worker.Worker(
- ... zc.resumelb.tests.app(), ('127.0.0.1', 0), dict(history=2),
+ ... zc.resumelb.tests.app(), ('127.0.0.1', 0), history=2,
... tracelog='tracelog', threads=1)
>>> worker_socket = gevent.socket.create_connection(worker.addr)
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.py 2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.py 2012-02-26 16:56:11 UTC (rev 124477)
@@ -32,11 +32,20 @@
zk = zc.zk.ZooKeeper(zookeeper)
address = zc.parse_addr.parse_addr(address)
from zc.resumelb.worker import Worker
- worker = Worker(app, address, zk.properties(path),
- threads=threads and int(threads),
+
+ worker = Worker(app, address, threads=threads and int(threads),
**kw)
+
+ # Set up notification of settings changes.
+ settings = zk.properties(path)
+ watcher = gevent.get_hub().loop.async()
+ watcher.start(lambda : worker.update_settings(settings))
+ settings(lambda _: watcher.send())
+
zk.register_server(path+'/providers', worker.addr)
worker.zk = zk
+ worker.__zksettings = settings
+
if run:
try:
worker.server.serve_forever()
@@ -110,7 +119,6 @@
zk = zc.zk.ZooKeeper(zookeeper)
- settings = zk.properties(path)
addrs = zk.children(path+'/workers/providers')
rcmod, rcexpr = options.request_classifier.split(':')
__import__(rcmod)
@@ -126,16 +134,24 @@
from zc.resumelb.lb import LB
lb = LB(map(zc.parse_addr.parse_addr, addrs),
- request_classifier, settings, disconnect_message)
- lb.zk = zk
+ request_classifier, disconnect_message)
# Set up notification of address changes.
- watcher = gevent.get_hub().loop.async()
- @watcher.start
+ awatcher = gevent.get_hub().loop.async()
+ @awatcher.start
def _():
lb.set_worker_addrs(map(zc.parse_addr.parse_addr, addrs))
- addrs(lambda a: watcher.send())
+ addrs(lambda a: awatcher.send())
+ # Set up notification of address changes.
+ settings = zk.properties(path)
+ swatcher = gevent.get_hub().loop.async()
+ swatcher.start(lambda : lb.update_settings(settings))
+ settings(lambda a: swatcher.send())
+
+ lb.zk = zk
+ lb.__zk = addrs, settings
+
# Now, start a wsgi server
addr = zc.parse_addr.parse_addr(options.address)
if options.max_connections:
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.test 2012-02-25 19:35:34 UTC (rev 124476)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.test 2012-02-26 16:56:11 UTC (rev 124477)
@@ -72,7 +72,7 @@
The worker got it's settings from the tree:
- >>> worker.settings.history
+ >>> worker.history
999
It loaded it's resume from resume.mar:
More information about the checkins
mailing list