[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Updated the lb pool to support splitting workers by version.
jim
cvs-admin at zope.org
Wed Jul 4 17:58:02 UTC 2012
Log message for revision 127267:
Updated the lb pool to support splitting workers by version.
Changed:
U zc.resumelb/trunk/src/zc/resumelb/lb.py
A zc.resumelb/trunk/src/zc/resumelb/single_version.test
U zc.resumelb/trunk/src/zc/resumelb/tests.py
-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/lb.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.py 2012-07-04 17:57:52 UTC (rev 127266)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.py 2012-07-04 17:57:59 UTC (rev 127267)
@@ -1,4 +1,5 @@
from bisect import bisect_left, insort
+import collections
import gevent
import gevent.hub
import gevent.pywsgi
@@ -104,13 +105,19 @@
class Pool:
def __init__(self,
- unskilled_score=None, variance=None, backlog_history=None):
+ unskilled_score=None, variance=None, backlog_history=None,
+ single_version=False):
self.workers = set()
self.nworkers = 0
self.unskilled = llist.dllist()
self.skilled = {} # rclass -> {[(score, workers)]}
self.event = gevent.event.Event()
_init_backlog(self)
+ self.single_version = single_version
+ if single_version:
+ # {version -> {worker}}
+ self.byversion = collections.defaultdict(set)
+ self.version = None
self.update_settings(dict(
unskilled_score=unskilled_score,
@@ -146,6 +153,12 @@
def __repr__(self):
outl = []
out = outl.append
+ if self.single_version:
+ out('Version: %s' % self.version)
+ for v in self.byversion:
+ if v != self.version and self.byversion[v]:
+ out(' Inactive: %s: %r' % (v, self.byversion[v]))
+
out('Request classes:')
for (rclass, skilled) in sorted(self.skilled.items()):
out(' %s: %s'
@@ -168,7 +181,7 @@
out(' %s: %r' % (backlog, sorted(workers)))
return '\n'.join(outl)
- def new_resume(self, worker, resume):
+ def _new_resume(self, worker, resume):
skilled = self.skilled
workers = self.workers
@@ -192,7 +205,26 @@
logger.info('new resume: %s', worker)
- def remove(self, worker):
+ def new_resume(self, worker, resume):
+ if self.single_version:
+ version = worker.version
+ self.byversion[version].add(worker)
+ if self.version is None:
+ self.version = version
+ if version == self.version:
+ # Adding a worker to the quorum will always preserve the quorum
+ self._new_resume(worker, resume)
+ else:
+ # Since the worker wasn't in the quorum, we don't call
+ # _new_resume, so we need to update it's resume ourselves:
+ worker.resume = resume
+
+ # Adding this worker might have created a new quorum
+ self._update_quorum()
+ else:
+ self._new_resume(worker, resume)
+
+ def _remove(self, worker):
skilled = self.skilled
for rclass, score in worker.resume.iteritems():
skilled[rclass].remove((score, worker))
@@ -201,17 +233,43 @@
worker.lnode = None
self.workers.remove(worker)
- self.backlog -= worker.backlog
- assert self.backlog >= 0, self.backlog
- _decay_backlog(self, self.decay)
-
self.nworkers = len(self.workers)
if self.nworkers:
self._update_decay()
else:
- assert self.backlog == 0, self.backlog
self.event.clear()
+ def remove(self, worker):
+
+ self.backlog -= worker.backlog
+ assert self.backlog >= 0, self.backlog
+ _decay_backlog(self, self.decay)
+
+ if self.single_version:
+ self.byversion[worker.version].remove(worker)
+ if worker.version == self.version:
+ self._remove(worker)
+ self._update_quorum()
+ # Note if the worker's version isn't self.version, it's
+ # not in the quorum, and it's removal can't cause the
+ # quorum to change.
+ else:
+ self._remove(worker)
+
+ def _update_quorum(self):
+ byversion = self.byversion
+ version = sorted(byversion, key=lambda v: -len(byversion[v]))[0]
+ if (version == self.version or
+ len(byversion[version]) == len(byversion[self.version])
+ ):
+ return # No change
+
+ for worker in byversion[self.version]:
+ self._remove(worker)
+ self.version = version
+ for worker in byversion[version]:
+ self._new_resume(worker, worker.resume)
+
def get(self, rclass, timeout=None):
"""Get a worker to handle a request class
"""
Added: zc.resumelb/trunk/src/zc/resumelb/single_version.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/single_version.test (rev 0)
+++ zc.resumelb/trunk/src/zc/resumelb/single_version.test 2012-07-04 17:57:59 UTC (rev 127267)
@@ -0,0 +1,135 @@
+===================
+Single-version pool
+===================
+
+When updating workers, it may be necessary to only run one version of
+worker software at a time. The resumelb pool has some support for
+this. If the ``single_version`` option is used, then it will group
+workers by version and only send work to workers in the largest group.
+
+Let's look at an example.
+
+ >>> import zc.resumelb.lb
+ >>> pool = zc.resumelb.lb.Pool(single_version=True)
+
+
+ >>> class Worker:
+ ... def __init__(self, name, version):
+ ... self.name = name
+ ... self.version = version
+ ... def __repr__(self):
+ ... return self.name
+ ... def __cmp__(self, other):
+ ... return cmp(self.name, other.name)
+ ... def __hash__(self):
+ ... return hash(self.name)
+ ... def handle(self, *args):
+ ... pass
+
+ >>> w1 = Worker('w1', 1)
+ >>> pool.new_resume(w1, {})
+ >>> w2 = Worker('w2', 1)
+ >>> pool.new_resume(w2, {})
+
+So, we have 2 unskilled workers. Now we'll add a skilled worker, but
+with version 2:
+
+ >>> w3 = Worker('w3', 2)
+ >>> pool.new_resume(w3, {'a': .1})
+ >>> pool.get('a', 0.0)
+ w2
+
+If we ask for a worker for 'a', we'll get one of the unskilled
+workers, even though we have a skilled worker, because the skilled
+worker had the wrong version:
+
+ >>> w4 = Worker('w4', 2)
+ >>> pool.new_resume(w4, {'b': .1})
+ >>> pool.get('b', 0.0)
+ w1
+
+If we look at the pool, we won't see the new workers we added, because
+they aren't active:
+
+ >>> print pool # doctest: +ELLIPSIS
+ Version: 1
+ Inactive: 2: set([w4, w3])
+ Request classes:
+ a: w2(1.0,1.0)
+ b: w1(1.0,1.0)
+ Backlogs:
+ overall backlog: 2 Decayed: 1.50... Avg: 0.75...
+ 1: [w1, w2]
+
+Now, if we add another worker with version 2, we'll stop using the
+version 1 workers even though they have a higher score:
+
+ >>> w5 = Worker('w5', 2)
+ >>> pool.new_resume(w5, {'c': .1})
+ >>> pool.get('c', 0.0)
+ w5
+ >>> pool.get('a', 0.0)
+ w3
+
+ >>> print pool # doctest: +ELLIPSIS
+ Version: 2
+ Inactive: 1: set([w2, w1])
+ Request classes:
+ a: w3(0.1,1.0)
+ b: w4(0.1,0)
+ c: w5(0.1,1.0)
+ Backlogs:
+ overall backlog: 4 Decayed: 2.52686361662 Avg: 0.842287872206
+ 0: [w4]
+ 1: [w3, w5]
+
+Let's finish the outstanding requests to w1:
+
+ >>> pool.put(w1)
+ >>> print pool # doctest: +ELLIPSIS
+ Version: 2
+ Inactive: 1: set([w2, w1])
+ Request classes:
+ a: w3(0.1,1.0)
+ b: w4(0.1,0)
+ c: w5(0.1,1.0)
+ Backlogs:
+ overall backlog: 3 Decayed: 2.62523984999 Avg: 0.875079949996
+ 0: [w4]
+ 1: [w3, w5]
+
+Now, disconnect w4 and w5, the quorum will switch to version 1:
+
+ >>> pool.remove(w4)
+ >>> pool.remove(w5)
+ >>> print pool # doctest: +ELLIPSIS
+ Version: 1
+ Inactive: 2: set([w3])
+ Request classes:
+ a: w2(1.0,0)
+ b: w1(1.0,0)
+ c:
+ Backlogs:
+ overall backlog: 2 Decayed: 2.58547913529 Avg: 1.29273956765
+ 0: [w1, w2]
+
+If we make a request, it'll be handled by the bew quorum:
+
+ >>> pool.get('b', 0.0)
+ w1
+
+If we remove the last version 2:
+
+ >>> pool.remove(w3)
+ >>> print pool # doctest: +ELLIPSIS
+ Version: 1
+ Request classes:
+ a: w2(1.0,0)
+ b: w1(1.0,1.0)
+ c:
+ Backlogs:
+ overall backlog: 2 Decayed: 2.56315626836 Avg: 1.28157813418
+ 0: [w2]
+ 1: [w1]
+
+We don't print inactive, since there aren't any.
Property changes on: zc.resumelb/trunk/src/zc/resumelb/single_version.test
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py 2012-07-04 17:57:52 UTC (rev 127266)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py 2012-07-04 17:57:59 UTC (rev 127267)
@@ -342,7 +342,7 @@
])
) + manuel.capture.Manuel(),
'lb.test', 'pool.test', 'worker.test', 'bytesizedqueue.test',
- 'bufferedqueue.test',
+ 'bufferedqueue.test', 'single_version.test',
setUp=setUp, tearDown=zope.testing.setupstack.tearDown),
manuel.testing.TestSuite(
More information about the checkins
mailing list