[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Fixed: When used with ZooKeeper, a load balancer could end up with
jim
cvs-admin at zope.org
Mon Oct 15 20:14:32 UTC 2012
Log message for revision 128016:
Fixed: When used with ZooKeeper, a load balancer could end up with
multiple connections to the same worker due to ZooKeeper
"flapping". (ZooKeeper might report that workers had gone away and
come back without the workers actually going away.)
Changed:
U zc.resumelb/trunk/src/zc/resumelb/README.txt
U zc.resumelb/trunk/src/zc/resumelb/lb.py
U zc.resumelb/trunk/src/zc/resumelb/lb.test
U zc.resumelb/trunk/src/zc/resumelb/tests.py
U zc.resumelb/trunk/src/zc/resumelb/zk.test
-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/README.txt
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/README.txt 2012-10-15 11:40:38 UTC (rev 128015)
+++ zc.resumelb/trunk/src/zc/resumelb/README.txt 2012-10-15 20:14:29 UTC (rev 128016)
@@ -242,6 +242,14 @@
Change History
==============
+0.7.1 (2012-10-15)
+------------------
+
+- Fixed: When used with ZooKeeper, a load balancer could end up with
+ multiple connections to the same worker due to ZooKeeper
+ "flapping". (ZooKeeper might report that workers had gone away and
+ come back without the workers actually going away.)
+
0.7.0 (2012-07-05)
------------------
Modified: zc.resumelb/trunk/src/zc/resumelb/lb.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.py 2012-10-15 11:40:38 UTC (rev 128015)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.py 2012-10-15 20:14:29 UTC (rev 128016)
@@ -44,31 +44,31 @@
addrs = dict((addr, None) for addr in addrs)
workletts = self.workletts
- old = list(workletts)
+
+ self.worker_addrs = addrs
for addr in addrs:
if addr not in workletts:
workletts[addr] = gevent.spawn(
self.connect, addr, workletts, addrs[addr])
- for addr in old:
- if addr not in addrs:
- workletts.pop(addr)
-
connect_sleep = 1.0
def connect(self, addr, workletts, version):
- while addr in workletts:
- try:
- socket = gevent.socket.create_connection(addr)
- Worker(self.pool, socket, addr, version)
- except gevent.GreenletExit, v:
+ try:
+ while addr in self.worker_addrs:
try:
- socket.close()
- except:
- pass
- raise
- except Exception, v:
- logger.exception('lb connecting to %r', addr)
- gevent.sleep(self.connect_sleep)
+ socket = gevent.socket.create_connection(addr)
+ Worker(self.pool, socket, addr, version)
+ except gevent.GreenletExit, v:
+ try:
+ socket.close()
+ except:
+ pass
+ raise
+ except Exception, v:
+ logger.exception('lb connecting to %r', addr)
+ gevent.sleep(self.connect_sleep)
+ finally:
+ del self.workletts[addr]
def stop(self):
for g in self.workletts.values():
Modified: zc.resumelb/trunk/src/zc/resumelb/lb.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.test 2012-10-15 11:40:38 UTC (rev 128015)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.test 2012-10-15 20:14:29 UTC (rev 128016)
@@ -323,7 +323,7 @@
Now, we'll disconnect worker1:
- >>> worker1.close()
+ >>> workers[0].close()
>>> gevent.sleep(.1)
The second GET and the HEAD request will be send to worker2:
@@ -406,8 +406,7 @@
>>> lb.connect_sleep = 0.01
>>> port = workers[0].server.server_port # We'll reuse below
>>> workers[0].server.stop()
- >>> socket = workers[0].socket
- >>> socket.close()
+ >>> socket = workers[0].close()
>>> gevent.sleep(.01)
>>> len(lb.pool.workers)
1
@@ -415,15 +414,15 @@
OK, so we lost the worker and the lb didn't reconnect because the
worker server is down:
- >>> workers[0].socket is socket
- True
+ >>> hasattr(workers[0], 'socket')
+ False
Now, we'll recreate the worker server and after a bit, the lb should reconnect:
>>> workers[0].server = gevent.server.StreamServer(
... ('127.0.0.1', port), workers[0].handle)
>>> workers[0].server.start()
- >>> wait(lambda : workers[0].socket is not socket)
+ >>> wait(lambda : hasattr(workers[0], 'socket'))
>>> write_message(workers[0].socket, 0, {'h3.com': 10.0})
>>> gevent.sleep(.01)
>>> len(lb.pool.workers)
@@ -498,7 +497,7 @@
(only new workers are affected.)
- >>> workers.pop(-1).socket.close()
+ >>> workers.pop(-1).close()
>>> lb.set_worker_addrs([w.addr for w in workers])
Graceful shutdown
Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py 2012-10-15 11:40:38 UTC (rev 128015)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py 2012-10-15 20:14:29 UTC (rev 128016)
@@ -143,9 +143,18 @@
('127.0.0.1', 0), self.handle)
server.start()
self.addr = '127.0.0.1', server.server_port
+
def handle(self, socket, addr):
- self.socket = socket
+ if not hasattr(self, 'socket'):
+ self.socket = socket
+ else:
+ raise AssertionError("worker got too many connections", self.addr)
+ def close(self):
+ socket = self.socket
+ del self.socket
+ socket.close()
+
def test_loading_recipes_with_no_history_argument():
"""A bug as introduced that caused resumes to be loaded
incorrectly when no history was given to the constructor. It
@@ -294,9 +303,52 @@
>>> server.stop()
>>> lb.stop()
>>> zk.close()
+ """
+def flappy_set_worker_addrs_doesnt_cause_duplicate_connections():
"""
+ >>> workers = [Worker() for i in range(2)]
+ >>> import zc.resumelb.lb
+ >>> lb = zc.resumelb.lb.LB([w.addr for w in workers],
+ ... zc.resumelb.lb.host_classifier, variance=4)
+ >>> wait(
+ ... lambda :
+ ... len([w for w in workers if hasattr(w, 'socket')]) == len(workers)
+ ... )
+ >>> worker1, worker2 = [w.socket for w in workers]
+ >>> from zc.resumelb.util import read_message, write_message
+
+ >>> write_message(worker1, 0, {'h1.com': 10.0})
+ >>> write_message(worker2, 0, {'h2.com': 10.0})
+
+ >>> import gevent
+ >>> gevent.sleep(.01) # Give resumes time to arrive
+
+ >>> print lb.pool
+ Request classes:
+ h1.com: 127.0.0.1:49927(10.0,0)
+ h2.com: 127.0.0.1:36316(10.0,0)
+ Backlogs:
+ overall backlog: 0 Decayed: 0 Avg: 0
+ 0: [127.0.0.1:49927, 127.0.0.1:36316]
+
+ Now, we'll flap the worker addrs:
+
+ >>> lb.set_worker_addrs([])
+ >>> lb.set_worker_addrs([w.addr for w in workers])
+ >>> gevent.sleep(.01) # Give resumes time to arrive
+
+ >>> print lb.pool
+ Request classes:
+ h1.com: 127.0.0.1:49927(10.0,0)
+ h2.com: 127.0.0.1:36316(10.0,0)
+ Backlogs:
+ overall backlog: 0 Decayed: 0 Avg: 0
+ 0: [127.0.0.1:49927, 127.0.0.1:36316]
+
+ """
+
def test_classifier(env):
return "yup, it's a test"
@@ -366,6 +418,9 @@
'zk.test',
setUp=zkSetUp, tearDown=zkTearDown),
doctest.DocTestSuite(
- setUp=zkSetUp, tearDown=zope.testing.setupstack.tearDown),
+ setUp=zkSetUp, tearDown=zope.testing.setupstack.tearDown,
+ checker = zope.testing.renormalizing.OutputChecker([
+ (re.compile(r'127.0.0.1:\d+'), '127.0.0.1:P'),
+ ])),
))
Modified: zc.resumelb/trunk/src/zc/resumelb/zk.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/zk.test 2012-10-15 11:40:38 UTC (rev 128015)
+++ zc.resumelb/trunk/src/zc/resumelb/zk.test 2012-10-15 20:14:29 UTC (rev 128016)
@@ -441,7 +441,7 @@
We see that there are fewer workers:
- >>> len(lb.workletts)
+ >>> len(lb.worker_addrs)
0
Not that if we looked at the number of pools, it would still be
More information about the checkins
mailing list