[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/ Replaced the lb algorithm with one that computes a maximum backlog
Jim Fulton
jim at zope.com
Sun Mar 4 20:21:22 UTC 2012
Log message for revision 124506:
Replaced the lb algorithm with one that computes a maximum backlog
based on variance from the pool mean backlog, with backlogs and mean
backlogs being smoothed using a moving average based on decayed sums
and counts.
Changed:
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
U Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
U Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2012-03-04 20:21:20 UTC (rev 124506)
@@ -87,20 +87,35 @@
class Pool:
- def __init__(self, max_backlog=40, unskilled_score=1.0):
- self.max_backlog = max_backlog
+ def __init__(self, unskilled_score=1.0, variance=4.0, backlog_history=9):
self.unskilled_score = unskilled_score
+ self.variance = variance
+ self.backlog_history = backlog_history
+ self._update_worker_decay()
self.workers = set()
+ self.nworkers = 0
self.unskilled = llist.dllist()
self.skilled = {} # rclass -> {[(score, workers)]}
self.event = gevent.event.Event()
+ _init_backlog(self)
def update_settings(self, settings):
- for name in ('max_backlog', 'unskilled_score'):
+ for name in ('unskilled_score', 'variance', 'backlog_history'):
if name in settings:
setattr(self, name, settings[name])
+ if 'backlog_history' in settings:
+ self._update_worker_decay()
+ self._update_decay()
+
+ def _update_decay(self):
+ if self.nworkers:
+ self.decay = 1.0 - 1.0/(self.backlog_history*2*self.nworkers)
+
+ def _update_worker_decay(self):
+ self.worker_decay = 1.0 - 1.0/(self.backlog_history*2)
+
def __repr__(self):
outl = []
out = outl.append
@@ -110,11 +125,15 @@
% (rclass,
', '.join(
'%s(%s,%s)' %
- (worker, score, worker.backlog)
+ (worker, score, worker.mbacklog)
for (score, worker) in sorted(skilled)
))
)
out('Backlogs:')
+ out(" overall backlog: %s Decayed: %s Avg: %s" % (
+ self.backlog, self.mbacklog,
+ (self.mbacklog / self.nworkers) if self.nworkers else None
+ ))
backlogs = {}
for worker in self.workers:
backlogs.setdefault(worker.backlog, []).append(worker)
@@ -122,24 +141,20 @@
out(' %s: %r' % (backlog, sorted(workers)))
return '\n'.join(outl)
- def new_resume(self, worker, resume=None):
+ def new_resume(self, worker, resume):
skilled = self.skilled
- unskilled = self.unskilled
workers = self.workers
if worker in workers:
for rclass, score in worker.resume.iteritems():
skilled[rclass].remove((score, worker))
- if resume is None:
- workers.remove(worker)
- if worker.lnode is not None:
- unskilled.remove(worker.lnode)
- worker.lnode = None
- return
else:
- worker.backlog = 0
+ _init_backlog(worker)
workers.add(worker)
- worker.lnode = unskilled.appendleft(worker)
+ self.nworkers = len(self.workers)
+ self._update_decay()
+ worker.lnode = self.unskilled.appendleft(worker)
+ self.event.set()
worker.resume = resume
for rclass, score in resume.iteritems():
@@ -148,11 +163,21 @@
except KeyError:
skilled[rclass] = set(((score, worker), ))
- if unskilled:
- self.event.set()
+ logger.info('new resume\n%s', self)
def remove(self, worker):
- self.new_resume(worker)
+ skilled = self.skilled
+ for rclass, score in worker.resume.iteritems():
+ skilled[rclass].remove((score, worker))
+ if getattr(worker, 'lnode', None) is not None:
+ self.unskilled.remove(worker.lnode)
+ worker.lnode = None
+ self.workers.remove(worker)
+ self.nworkers = len(self.workers)
+ if self.nworkers:
+ self._update_decay()
+ else:
+ self.event.clear()
def get(self, rclass, timeout=None):
"""Get a worker to handle a request class
@@ -164,61 +189,59 @@
return None
# Look for a skilled worker
- max_backlog = self.max_backlog
best_score = 0
best_worker = None
skilled = self.skilled.get(rclass)
if skilled is None:
skilled = self.skilled[rclass] = set()
+
+ max_backlog = self.variance * max(self.mbacklog / self.nworkers, 2)
for score, worker in skilled:
+ if worker.mbacklog > max_backlog:
+ continue
backlog = worker.backlog + 1
- if backlog > max_backlog:
- continue
score /= backlog
if (score > best_score):
best_score = score
best_worker = worker
if not best_score:
- while unskilled.first.value.backlog >= max_backlog:
- # Edge case. max_backlog was reduced after a worker
- # with a larger backlog was added.
- #import pdb; pdb.set_trace()
- unskilled.first.value.lnode = None
- unskilled.popleft()
- if not unskilled:
- # OK, now we need to wait. Just start over.
- return self.get(rclass, timeout)
-
best_worker = unskilled.first.value
if rclass not in best_worker.resume:
-
# We now have an unskilled worker and we need to
# assign it a score.
- # - We want to give it work somewhat gradually.
- # - We got here because:
- # - there are no skilled workers,
- # - The skilled workers have all reached their max backlog
score = self.unskilled_score
best_worker.resume[rclass] = score
skilled.add((score, best_worker))
+ # Move worker from lru to mru end of queue
unskilled.remove(best_worker.lnode)
+ best_worker.lnode = unskilled.append(best_worker)
+
best_worker.backlog += 1
- if best_worker.backlog < max_backlog:
- best_worker.lnode = unskilled.append(best_worker)
- else:
- best_worker.lnode = None
+ _decay_backlog(best_worker, self.worker_decay)
+ self.backlog += 1
+ _decay_backlog(self, self.decay)
+
return best_worker
def put(self, worker):
- if worker.lnode is None:
- worker.lnode = self.unskilled.append(worker)
- self.event.set()
- if worker.backlog:
+ self.backlog -= 1
+ assert self.backlog >= 0, self.backlog
+ _decay_backlog(self, self.decay)
+ if worker.backlog > 0:
worker.backlog -= 1
+ _decay_backlog(worker, self.worker_decay)
+def _init_backlog(worker):
+ worker.backlog = worker.nbacklog = worker.dbacklog = worker.mbacklog = 0
+
+def _decay_backlog(worker, decay):
+ worker.dbacklog = worker.dbacklog*decay + worker.backlog
+ worker.nbacklog = worker.nbacklog*decay + 1
+ worker.mbacklog = worker.dbacklog / worker.nbacklog
+
class Worker(zc.resumelb.util.Worker):
maxrno = (1<<32) - 1
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test 2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test 2012-03-04 20:21:20 UTC (rev 124506)
@@ -211,12 +211,13 @@
Workers use their addresses as their reprs.
- >>> print lb.pool
+ >>> print lb.pool # doctest: +ELLIPSIS
Request classes:
- h1.com: 127.0.0.1:65027(10.0,0)
- h2.com: 127.0.0.1:65028(10.0,0)
+ h1.com: 127.0.0.1:0(10.0,0.970...)
+ h2.com: 127.0.0.1:0(10.0,0.485...)
Backlogs:
- 0: [127.0.0.1:65028, 127.0.0.1:65027]
+ overall backlog: 0 Decayed: 1.47809138734 Avg: 0.73904569367
+ 0: [127.0.0.1:0, 127.0.0.1:0]
Worker disconnection
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test 2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test 2012-03-04 20:21:20 UTC (rev 124506)
@@ -15,8 +15,27 @@
We'll test the pool with stand-ins for the local workers.
>>> import zc.resumelb.lb
- >>> pool = zc.resumelb.lb.Pool()
+ >>> pool = zc.resumelb.lb.Pool(
+ ... variance=2, backlog_history=2, unskilled_score=.5)
+We specified a number of optional paramters that we'll see in action
+later:
+
+variance
+ How many times the pool mean backlog we'll ket a worker's backlog
+ rise before we look for another worker. The default is 4.
+
+backlog_history
+ The (aproximate) number of requests to include in the mean backlog
+ for a worker. The default is 9.
+
+ The pool also has a backlog history, which is the product of the
+ worker backlog_history and the number of workers.
+
+unskilled_score
+ The score to assign to unsklilled workers. The default is 1.
+
+
The get method is used to get a worker from the pool. A request class
and an optional timeout is passed. (The timeout is mainly useful for
testing.)
@@ -64,20 +83,24 @@
We're gonna be white box and look at the pool data structures from
time to time.
- >>> pool
+ >>> pool # doctest: +ELLIPSIS
Request classes:
- bar: w2(1.0,1)
- foo: w1(1.0,2)
+ bar: w2(0.5,1.0)
+ foo: w1(0.5,1.57...)
Backlogs:
+ overall backlog: 3 Decayed: 2.08... Avg: 1.04...
1: [w2]
2: [w1]
Here, we can see that w1 is used for the foo class and w2 for the bar
-class. In the request classes, the worker's score and it's overall
-backlog if shown in paretheses. We see that both workers have a score
+class. In the request classes, the worker's score and it's decayed
+backlog is shown in paretheses. We see that both workers have a score
of 1.0. This is the default score for new workers. We'll say more
about this later.
+The decayed backlog, for the pool, and for workers, is an "average"
+backlog over a backlog history, which is a configation parameter.
+
Let's add another worker:
>>> w3 = Worker('w3')
@@ -85,114 +108,133 @@
and make some more foo requests:
- >>> [pool.get('foo') for i in range(3)]
- [w1, w1, w1]
+ >>> [pool.get('foo') for i in range(4)]
+ [w1, w1, w1, w1]
- >>> pool
+ >>> pool # doctest: +ELLIPSIS
Request classes:
- bar: w2(1.0,1)
- foo: w1(1.0,5)
+ bar: w2(0.5,1.0)
+ foo: w1(0.5,4.29907929908)
Backlogs:
+ overall backlog: 7 Decayed: 4.39137888071 Avg: 1.46379296024
0: [w3]
1: [w2]
- 5: [w1]
+ 6: [w1]
Even though we still had a worker with no backlog, we kept sending
-requests to w1. This is because w1 hasn't reached it's maximum
-backlog. Also, it's score is greater than the min score, which
-defaults to 1.0. Let's reduce the maximum backlog to 5:
+requests to w1. This is because w1 hasn't reached the maximum
+backlog, which is:
- >>> pool.max_backlog = 5
+ variance * max(2, pool_mean_backlog)
So now, w1 has reached it's maximum backlog. If
-we make another foo request, we'll start using w3, and when that's
-reached it's maximum backlog, we'll start using w2:
+we make another foo request, we'll start using w3:
- >>> [pool.get('foo') for i in range(7)]
- [w3, w3, w3, w3, w3, w2, w2]
+ >>> [pool.get('foo') for i in range(10)]
+ [w3, w3, w3, w3, w3, w3, w1, w3, w1, w3]
- >>> pool
+ >>> pool # doctest: +ELLIPSIS
Request classes:
- bar: w2(1.0,3)
- foo: w1(1.0,5), w2(1.0,3), w3(1.0,5)
+ bar: w2(0.5,1.0)
+ foo: w1(0.5,5.89000423908), w3(0.5,5.89000423908)
Backlogs:
- 3: [w2]
- 5: [w1, w3]
+ overall backlog: 17 Decayed: 11.0516220545 Avg: 3.68387401818
+ 1: [w2]
+ 8: [w1, w3]
-If we get all workers to the maximum backlog, we'll block until a
-worker is free.
+Something interesting heppened here. After several requests, the
+pools switched back and forth between w1 and w3. It never switched to
+w2. This is because, as the backlogs for w2 and w2 increased, so did
+the mean backlog. This is useful, because it prevents a single
+request class from taking over the entire pool. Let's see what
+happens when we add some new workers:
+ >>> w4 = Worker('w4')
+ >>> pool.new_resume(w4, {})
+ >>> w5 = Worker('w5')
+ >>> pool.new_resume(w5, {})
+
+ >>> pool # doctest: +ELLIPSIS
+ Request classes:
+ bar: w2(0.5,1.0)
+ foo: w1(0.5,5.89000423908), w3(0.5,5.89000423908)
+ Backlogs:
+ overall backlog: 17 Decayed: 11.0516220545 Avg: 2.21032441091
+ 0: [w4, w5]
+ 1: [w2]
+ 8: [w1, w3]
+
>>> [pool.get('foo') for i in range(2)]
- [w2, w2]
+ [w5, w5]
- >>> pool.get('foo', 0.0)
+The new workers caused the overall mean to drop and when we got a foo
+request, it went to the newest new worker. The subsequent request
+went to that worker because it was skilled and had a fairly low backlog.
When a worker is done doing it's work, we put it back in the pool:
- >>> pool.put(w1)
- >>> pool.put(w1)
- >>> pool.put(w1)
- >>> pool.put(w2)
- >>> pool.put(w3)
- >>> pool.put(w3)
- >>> pool
+ >>> for i in range(8):
+ ... pool.put(w1)
+ >>> for i in range(8):
+ ... pool.put(w3)
+
+ >>> pool # doctest: +ELLIPSIS
Request classes:
- bar: w2(1.0,4)
- foo: w1(1.0,2), w2(1.0,4), w3(1.0,3)
+ bar: w2(0.5,1.0)
+ foo: w1(0.5,2.45398560273), w3(0.5,2.45398560273), w5(0.5,1.57142857143)
Backlogs:
- 2: [w1]
- 3: [w3]
- 4: [w2]
+ overall backlog: 3 Decayed: 10.2983868199 Avg: 2.05967736397
+ 0: [w1, w3, w4]
+ 1: [w2]
+ 2: [w5]
Now, when we get a worker, we'll get w1.
>>> pool.get('foo', 0.0)
w1
-Why? We adjust each score by the worker's backlog, so even though all
-2 workers had the same score, w1 is chosen because it has the smallest
-backlog.
-
Now that we've done some work, let's update the resumes. This will
normally be done by workers periodically, after collecting performance
data.
>>> pool.new_resume(w1, {'foo': 6.0})
-
>>> pool.new_resume(w2, {'bar': 2.0, 'foo': 2.0})
-
>>> pool.new_resume(w3, {'foo': 3.8})
- >>> pool
+ >>> pool # doctest: +ELLIPSIS
Request classes:
- bar: w2(2.0,4)
- foo: w2(2.0,4), w3(3.8,3), w1(6.0,3)
+ bar: w2(2.0,1.0)
+ foo: w5(0.5,1.57...), w2(2.0,1.0), w3(3.8,2.45...), w1(6.0,2.08...)
Backlogs:
- 3: [w1, w3]
- 4: [w2]
+ overall backlog: 4 Decayed: 9.90317056202 Avg: 1.9806341124
+ 0: [w3, w4]
+ 1: [w1, w2]
+ 2: [w5]
>>> pool.get('foo')
+ w3
+ >>> pool.get('foo')
w1
>>> pool.get('foo')
w1
- >>> pool
+ >>> pool # doctest: +ELLIPSIS
Request classes:
- bar: w2(2.0,4)
- foo: w2(2.0,4), w3(3.8,3), w1(6.0,5)
+ bar: w2(2.0,1.0)
+ foo: w5(0.5,1.57...), w2(2.0,1.0), w3(3.8,2.08...), w1(6.0,2.30...)
Backlogs:
- 3: [w3]
- 4: [w2]
- 5: [w1]
+ overall backlog: 7 Decayed: 9.23495653237 Avg: 1.84699130647
+ 0: [w4]
+ 1: [w2, w3]
+ 2: [w5]
+ 3: [w1]
-Because w1 has reached the maximum backlog, it's out of the running.
-
>>> pool.get('foo')
w3
>>> pool.get('foo')
+ w1
+ >>> pool.get('foo')
w3
- >>> pool.get('foo')
- w2
>>> pool.put(w1)
>>> pool.put(w3)
@@ -200,37 +242,20 @@
>>> pool.put(w3)
>>> pool.put(w3)
>>> pool.put(w3)
- >>> pool
+ >>> pool # doctest: +ELLIPSIS
Request classes:
- bar: w2(2.0,5)
- foo: w2(2.0,5), w3(3.8,0), w1(6.0,4)
+ bar: w2(2.0,1.0)
+ foo: w5(0.5,1.57...), w2(2.0,1.0), w3(3.8,1.43...), w1(6.0,2.79...)
Backlogs:
- 0: [w3]
- 4: [w1]
- 5: [w2]
+ overall backlog: 4 Decayed: 8.35... Avg: 1.67...
+ 0: [w3, w4]
+ 1: [w2]
+ 2: [w5]
+ 3: [w1]
>>> [pool.get('foo') for i in range(5)]
- [w3, w3, w3, w1, w3]
+ [w3, w3, w1, w3, w1]
-Pool settings
-=============
-
-There are several settings that effect pools:
-
-max_backlog
- Maximum worker backlog, defaulting to 40.
-
-min_score
- A worker won't be used if it has a backlog greater than 1 and it's
- score is less than min_score.
-
-unskilled_score
- The score assigned to workers when given a new request class. This
- defaults to min_score.
-
-We've already seen max_backlog at work. Let's test the other
-settings.
-
Worker disconnect
=================
@@ -238,9 +263,37 @@
>>> pool.remove(w1)
>>> pool.remove(w3)
- >>> pool
+ >>> pool # doctest: +ELLIPSIS
Request classes:
- bar: w2(2.0,5)
- foo: w2(2.0,5)
+ bar: w2(2.0,1.0)
+ foo: w5(0.5,1.57142857143), w2(2.0,1.0)
Backlogs:
- 5: [w2]
+ overall backlog: 9 Decayed: 8.043843598 Avg: 2.68128119933
+ 0: [w4]
+ 1: [w2]
+ 2: [w5]
+
+Updating worker settings
+========================
+
+ >>> pool = zc.resumelb.lb.Pool()
+ >>> pool.new_resume(w1, {'foo': 6.0})
+ >>> pool.new_resume(w2, {'bar': 2.0, 'foo': 2.0})
+ >>> pool.new_resume(w3, {'foo': 3.8})
+ >>> pool.variance, pool.backlog_history, pool.unskilled_score
+ (4.0, 9, 1.0)
+ >>> pool.worker_decay, pool.decay # doctest: +ELLIPSIS
+ (0.944..., 0.981...)
+
+ >>> pool.update_settings(dict(variance=2.0))
+ >>> pool.variance, pool.backlog_history, pool.unskilled_score
+ (2.0, 9, 1.0)
+ >>> pool.worker_decay, pool.decay # doctest: +ELLIPSIS
+ (0.944..., 0.981...)
+
+ >>> pool.update_settings(
+ ... dict(variance=3.0, backlog_history=6, unskilled_score=.25))
+ >>> pool.variance, pool.backlog_history, pool.unskilled_score
+ (3.0, 6, 0.25)
+ >>> pool.worker_decay, pool.decay # doctest: +ELLIPSIS
+ (0.916..., 0.972...)
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.py 2012-03-04 14:15:24 UTC (rev 124505)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.py 2012-03-04 20:21:20 UTC (rev 124506)
@@ -25,6 +25,7 @@
threads=None, backdoor=False, run=True, **kw):
"""Paste deploy server runner
"""
+ # XXX support log level
if loggers:
import ZConfig
ZConfig.configureLoggers(loggers)
@@ -123,6 +124,7 @@
else:
return
+ # XXX default to basic config?
if options.logger_configuration:
import ZConfig
with open(options.logger_configuration) as f:
More information about the checkins
mailing list