[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Fixed: When used with ZooKeeper, a load balancer could end up with

jim cvs-admin at zope.org
Mon Oct 15 20:14:32 UTC 2012


Log message for revision 128016:
  Fixed: When used with ZooKeeper, a load balancer could end up with
  multiple connections to the same worker due to ZooKeeper
  "flapping".  (ZooKeeper might report that workers had gone away and
  come back without the workers actually going away.)
  

Changed:
  U   zc.resumelb/trunk/src/zc/resumelb/README.txt
  U   zc.resumelb/trunk/src/zc/resumelb/lb.py
  U   zc.resumelb/trunk/src/zc/resumelb/lb.test
  U   zc.resumelb/trunk/src/zc/resumelb/tests.py
  U   zc.resumelb/trunk/src/zc/resumelb/zk.test

-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/README.txt
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/README.txt	2012-10-15 11:40:38 UTC (rev 128015)
+++ zc.resumelb/trunk/src/zc/resumelb/README.txt	2012-10-15 20:14:29 UTC (rev 128016)
@@ -242,6 +242,14 @@
 Change History
 ==============
 
+0.7.1 (2012-10-15)
+------------------
+
+- Fixed: When used with ZooKeeper, a load balancer could end up with
+  multiple connections to the same worker due to ZooKeeper
+  "flapping".  (ZooKeeper might report that workers had gone away and
+  come back without the workers actually going away.)
+
 0.7.0 (2012-07-05)
 ------------------
 

Modified: zc.resumelb/trunk/src/zc/resumelb/lb.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-10-15 11:40:38 UTC (rev 128015)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-10-15 20:14:29 UTC (rev 128016)
@@ -44,31 +44,31 @@
             addrs = dict((addr, None) for addr in addrs)
 
         workletts = self.workletts
-        old = list(workletts)
+
+        self.worker_addrs = addrs
         for addr in addrs:
             if addr not in workletts:
                 workletts[addr] = gevent.spawn(
                     self.connect, addr, workletts, addrs[addr])
 
-        for addr in old:
-            if addr not in addrs:
-                workletts.pop(addr)
-
     connect_sleep = 1.0
     def connect(self, addr, workletts, version):
-        while addr in workletts:
-            try:
-                socket = gevent.socket.create_connection(addr)
-                Worker(self.pool, socket, addr, version)
-            except gevent.GreenletExit, v:
+        try:
+            while addr in self.worker_addrs:
                 try:
-                    socket.close()
-                except:
-                    pass
-                raise
-            except Exception, v:
-                logger.exception('lb connecting to %r', addr)
-                gevent.sleep(self.connect_sleep)
+                    socket = gevent.socket.create_connection(addr)
+                    Worker(self.pool, socket, addr, version)
+                except gevent.GreenletExit, v:
+                    try:
+                        socket.close()
+                    except:
+                        pass
+                    raise
+                except Exception, v:
+                    logger.exception('lb connecting to %r', addr)
+                    gevent.sleep(self.connect_sleep)
+        finally:
+            del self.workletts[addr]
 
     def stop(self):
         for g in self.workletts.values():

Modified: zc.resumelb/trunk/src/zc/resumelb/lb.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.test	2012-10-15 11:40:38 UTC (rev 128015)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.test	2012-10-15 20:14:29 UTC (rev 128016)
@@ -323,7 +323,7 @@
 
 Now, we'll disconnect worker1:
 
-    >>> worker1.close()
+    >>> workers[0].close()
     >>> gevent.sleep(.1)
 
 The second GET and the HEAD request will be send to worker2:
@@ -406,8 +406,7 @@
     >>> lb.connect_sleep = 0.01
     >>> port = workers[0].server.server_port # We'll reuse below
     >>> workers[0].server.stop()
-    >>> socket = workers[0].socket
-    >>> socket.close()
+    >>> socket = workers[0].close()
     >>> gevent.sleep(.01)
     >>> len(lb.pool.workers)
     1
@@ -415,15 +414,15 @@
 OK, so we lost the worker and the lb didn't reconnect because the
 worker server is down:
 
-    >>> workers[0].socket is socket
-    True
+    >>> hasattr(workers[0], 'socket')
+    False
 
 Now, we'll recreate the worker server and after a bit, the lb should reconnect:
 
     >>> workers[0].server = gevent.server.StreamServer(
     ...     ('127.0.0.1', port), workers[0].handle)
     >>> workers[0].server.start()
-    >>> wait(lambda : workers[0].socket is not socket)
+    >>> wait(lambda : hasattr(workers[0], 'socket'))
     >>> write_message(workers[0].socket, 0, {'h3.com': 10.0})
     >>> gevent.sleep(.01)
     >>> len(lb.pool.workers)
@@ -498,7 +497,7 @@
 
 (only new workers are affected.)
 
-    >>> workers.pop(-1).socket.close()
+    >>> workers.pop(-1).close()
     >>> lb.set_worker_addrs([w.addr for w in workers])
 
 Graceful shutdown

Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-10-15 11:40:38 UTC (rev 128015)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-10-15 20:14:29 UTC (rev 128016)
@@ -143,9 +143,18 @@
             ('127.0.0.1', 0), self.handle)
         server.start()
         self.addr = '127.0.0.1', server.server_port
+
     def handle(self, socket, addr):
-        self.socket = socket
+        if not hasattr(self, 'socket'):
+            self.socket = socket
+        else:
+            raise AssertionError("worker got too many connections", self.addr)
 
+    def close(self):
+        socket = self.socket
+        del self.socket
+        socket.close()
+
 def test_loading_recipes_with_no_history_argument():
     """A bug as introduced that caused resumes to be loaded
     incorrectly when no history was given to the constructor.  It
@@ -294,9 +303,52 @@
     >>> server.stop()
     >>> lb.stop()
     >>> zk.close()
+    """
 
+def flappy_set_worker_addrs_doesnt_cause_duplicate_connections():
     """
+    >>> workers = [Worker() for i in range(2)]
+    >>> import zc.resumelb.lb
+    >>> lb = zc.resumelb.lb.LB([w.addr for w in workers],
+    ...                        zc.resumelb.lb.host_classifier, variance=4)
 
+    >>> wait(
+    ...     lambda :
+    ...     len([w for w in workers if hasattr(w, 'socket')]) == len(workers)
+    ...     )
+    >>> worker1, worker2 = [w.socket for w in workers]
+    >>> from zc.resumelb.util import read_message, write_message
+
+    >>> write_message(worker1, 0, {'h1.com': 10.0})
+    >>> write_message(worker2, 0, {'h2.com': 10.0})
+
+    >>> import gevent
+    >>> gevent.sleep(.01) # Give resumes time to arrive
+
+    >>> print lb.pool
+    Request classes:
+      h1.com: 127.0.0.1:49927(10.0,0)
+      h2.com: 127.0.0.1:36316(10.0,0)
+    Backlogs:
+      overall backlog: 0 Decayed: 0 Avg: 0
+      0: [127.0.0.1:49927, 127.0.0.1:36316]
+
+    Now, we'll flap the worker addrs:
+
+    >>> lb.set_worker_addrs([])
+    >>> lb.set_worker_addrs([w.addr for w in workers])
+    >>> gevent.sleep(.01) # Give resumes time to arrive
+
+    >>> print lb.pool
+    Request classes:
+      h1.com: 127.0.0.1:49927(10.0,0)
+      h2.com: 127.0.0.1:36316(10.0,0)
+    Backlogs:
+      overall backlog: 0 Decayed: 0 Avg: 0
+      0: [127.0.0.1:49927, 127.0.0.1:36316]
+
+    """
+
 def test_classifier(env):
     return "yup, it's a test"
 
@@ -366,6 +418,9 @@
             'zk.test',
             setUp=zkSetUp, tearDown=zkTearDown),
         doctest.DocTestSuite(
-            setUp=zkSetUp, tearDown=zope.testing.setupstack.tearDown),
+            setUp=zkSetUp, tearDown=zope.testing.setupstack.tearDown,
+            checker = zope.testing.renormalizing.OutputChecker([
+                    (re.compile(r'127.0.0.1:\d+'), '127.0.0.1:P'),
+                    ])),
         ))
 

Modified: zc.resumelb/trunk/src/zc/resumelb/zk.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/zk.test	2012-10-15 11:40:38 UTC (rev 128015)
+++ zc.resumelb/trunk/src/zc/resumelb/zk.test	2012-10-15 20:14:29 UTC (rev 128016)
@@ -441,7 +441,7 @@
 
 We see that there are fewer workers:
 
-    >>> len(lb.workletts)
+    >>> len(lb.worker_addrs)
     0
 
 Not that if we looked at the number of pools, it would still be



More information about the checkins mailing list