[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Updated the lb pool to support splitting workers by version.

jim cvs-admin at zope.org
Wed Jul 4 17:58:02 UTC 2012


Log message for revision 127267:
  Updated the lb pool to support splitting workers by version.
  

Changed:
  U   zc.resumelb/trunk/src/zc/resumelb/lb.py
  A   zc.resumelb/trunk/src/zc/resumelb/single_version.test
  U   zc.resumelb/trunk/src/zc/resumelb/tests.py

-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/lb.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-07-04 17:57:52 UTC (rev 127266)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-07-04 17:57:59 UTC (rev 127267)
@@ -1,4 +1,5 @@
 from bisect import bisect_left, insort
+import collections
 import gevent
 import gevent.hub
 import gevent.pywsgi
@@ -104,13 +105,19 @@
 class Pool:
 
     def __init__(self,
-                 unskilled_score=None, variance=None, backlog_history=None):
+                 unskilled_score=None, variance=None, backlog_history=None,
+                 single_version=False):
         self.workers = set()
         self.nworkers = 0
         self.unskilled = llist.dllist()
         self.skilled = {}   # rclass -> {[(score, workers)]}
         self.event = gevent.event.Event()
         _init_backlog(self)
+        self.single_version = single_version
+        if single_version:
+            # {version -> {worker}}
+            self.byversion = collections.defaultdict(set)
+            self.version = None
 
         self.update_settings(dict(
             unskilled_score=unskilled_score,
@@ -146,6 +153,12 @@
     def __repr__(self):
         outl = []
         out = outl.append
+        if self.single_version:
+            out('Version: %s' % self.version)
+            for v in self.byversion:
+                if v != self.version and self.byversion[v]:
+                    out('  Inactive: %s: %r' % (v, self.byversion[v]))
+
         out('Request classes:')
         for (rclass, skilled) in sorted(self.skilled.items()):
             out('  %s: %s'
@@ -168,7 +181,7 @@
             out('  %s: %r' % (backlog, sorted(workers)))
         return '\n'.join(outl)
 
-    def new_resume(self, worker, resume):
+    def _new_resume(self, worker, resume):
         skilled = self.skilled
         workers = self.workers
 
@@ -192,7 +205,26 @@
 
         logger.info('new resume: %s', worker)
 
-    def remove(self, worker):
+    def new_resume(self, worker, resume):
+        if self.single_version:
+            version = worker.version
+            self.byversion[version].add(worker)
+            if self.version is None:
+                self.version = version
+            if version == self.version:
+                # Adding a worker to the quorum will always preserve the quorum
+                self._new_resume(worker, resume)
+            else:
+                # Since the worker wasn't in the quorum, we don't call
+                # _new_resume, so we need to update it's resume ourselves:
+                worker.resume = resume
+
+                # Adding this worker might have created a new quorum
+                self._update_quorum()
+        else:
+            self._new_resume(worker, resume)
+
+    def _remove(self, worker):
         skilled = self.skilled
         for rclass, score in worker.resume.iteritems():
             skilled[rclass].remove((score, worker))
@@ -201,17 +233,43 @@
             worker.lnode = None
         self.workers.remove(worker)
 
-        self.backlog -= worker.backlog
-        assert self.backlog >= 0, self.backlog
-        _decay_backlog(self, self.decay)
-
         self.nworkers = len(self.workers)
         if self.nworkers:
             self._update_decay()
         else:
-            assert self.backlog == 0, self.backlog
             self.event.clear()
 
+    def remove(self, worker):
+
+        self.backlog -= worker.backlog
+        assert self.backlog >= 0, self.backlog
+        _decay_backlog(self, self.decay)
+
+        if self.single_version:
+            self.byversion[worker.version].remove(worker)
+            if worker.version == self.version:
+                self._remove(worker)
+                self._update_quorum()
+            # Note if the worker's version isn't self.version, it's
+            # not in the quorum, and it's removal can't cause the
+            # quorum to change.
+        else:
+            self._remove(worker)
+
+    def _update_quorum(self):
+        byversion = self.byversion
+        version = sorted(byversion, key=lambda v: -len(byversion[v]))[0]
+        if (version == self.version or
+            len(byversion[version]) == len(byversion[self.version])
+            ):
+            return # No change
+
+        for worker in byversion[self.version]:
+            self._remove(worker)
+        self.version = version
+        for worker in byversion[version]:
+            self._new_resume(worker, worker.resume)
+
     def get(self, rclass, timeout=None):
         """Get a worker to handle a request class
         """

Added: zc.resumelb/trunk/src/zc/resumelb/single_version.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/single_version.test	                        (rev 0)
+++ zc.resumelb/trunk/src/zc/resumelb/single_version.test	2012-07-04 17:57:59 UTC (rev 127267)
@@ -0,0 +1,135 @@
+===================
+Single-version pool
+===================
+
+When updating workers, it may be necessary to only run one version of
+worker software at a time.  The resumelb pool has some support for
+this.  If the ``single_version`` option is used, then it will group
+workers by version and only send work to workers in the largest group.
+
+Let's look at an example.
+
+    >>> import zc.resumelb.lb
+    >>> pool = zc.resumelb.lb.Pool(single_version=True)
+
+
+    >>> class Worker:
+    ...     def __init__(self, name, version):
+    ...         self.name = name
+    ...         self.version = version
+    ...     def __repr__(self):
+    ...         return self.name
+    ...     def __cmp__(self, other):
+    ...         return cmp(self.name, other.name)
+    ...     def __hash__(self):
+    ...         return hash(self.name)
+    ...     def handle(self, *args):
+    ...         pass
+
+    >>> w1 = Worker('w1', 1)
+    >>> pool.new_resume(w1, {})
+    >>> w2 = Worker('w2', 1)
+    >>> pool.new_resume(w2, {})
+
+So, we have 2 unskilled workers. Now we'll add a skilled worker, but
+with version 2:
+
+    >>> w3 = Worker('w3', 2)
+    >>> pool.new_resume(w3, {'a': .1})
+    >>> pool.get('a', 0.0)
+    w2
+
+If we ask for a worker for 'a', we'll get one of the unskilled
+workers, even though we have a skilled worker, because the skilled
+worker had the wrong version:
+
+    >>> w4 = Worker('w4', 2)
+    >>> pool.new_resume(w4, {'b': .1})
+    >>> pool.get('b', 0.0)
+    w1
+
+If we look at the pool, we won't see the new workers we added, because
+they aren't active:
+
+    >>> print pool # doctest: +ELLIPSIS
+    Version: 1
+      Inactive: 2: set([w4, w3])
+    Request classes:
+      a: w2(1.0,1.0)
+      b: w1(1.0,1.0)
+    Backlogs:
+      overall backlog: 2 Decayed: 1.50... Avg: 0.75...
+      1: [w1, w2]
+
+Now, if we add another worker with version 2, we'll stop using the
+version 1 workers even though they have a higher score:
+
+    >>> w5 = Worker('w5', 2)
+    >>> pool.new_resume(w5, {'c': .1})
+    >>> pool.get('c', 0.0)
+    w5
+    >>> pool.get('a', 0.0)
+    w3
+
+    >>> print pool # doctest: +ELLIPSIS
+    Version: 2
+      Inactive: 1: set([w2, w1])
+    Request classes:
+      a: w3(0.1,1.0)
+      b: w4(0.1,0)
+      c: w5(0.1,1.0)
+    Backlogs:
+      overall backlog: 4 Decayed: 2.52686361662 Avg: 0.842287872206
+      0: [w4]
+      1: [w3, w5]
+
+Let's finish the outstanding requests to w1:
+
+    >>> pool.put(w1)
+    >>> print pool # doctest: +ELLIPSIS
+    Version: 2
+      Inactive: 1: set([w2, w1])
+    Request classes:
+      a: w3(0.1,1.0)
+      b: w4(0.1,0)
+      c: w5(0.1,1.0)
+    Backlogs:
+      overall backlog: 3 Decayed: 2.62523984999 Avg: 0.875079949996
+      0: [w4]
+      1: [w3, w5]
+
+Now, disconnect w4 and w5, the quorum will switch to version 1:
+
+    >>> pool.remove(w4)
+    >>> pool.remove(w5)
+    >>> print pool # doctest: +ELLIPSIS
+    Version: 1
+      Inactive: 2: set([w3])
+    Request classes:
+      a: w2(1.0,0)
+      b: w1(1.0,0)
+      c: 
+    Backlogs:
+      overall backlog: 2 Decayed: 2.58547913529 Avg: 1.29273956765
+      0: [w1, w2]
+
+If we make a request, it'll be handled by the bew quorum:
+
+    >>> pool.get('b', 0.0)
+    w1
+
+If we remove the last version 2:
+
+    >>> pool.remove(w3)
+    >>> print pool # doctest: +ELLIPSIS
+    Version: 1
+    Request classes:
+      a: w2(1.0,0)
+      b: w1(1.0,1.0)
+      c: 
+    Backlogs:
+      overall backlog: 2 Decayed: 2.56315626836 Avg: 1.28157813418
+      0: [w2]
+      1: [w1]
+
+We don't print inactive, since there aren't any.


Property changes on: zc.resumelb/trunk/src/zc/resumelb/single_version.test
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-07-04 17:57:52 UTC (rev 127266)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-07-04 17:57:59 UTC (rev 127267)
@@ -342,7 +342,7 @@
                     ])
                 ) + manuel.capture.Manuel(),
             'lb.test', 'pool.test', 'worker.test', 'bytesizedqueue.test',
-            'bufferedqueue.test',
+            'bufferedqueue.test', 'single_version.test',
             setUp=setUp, tearDown=zope.testing.setupstack.tearDown),
 
         manuel.testing.TestSuite(



More information about the checkins mailing list