[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/lb. Handle workers that totally go away and come back.

Jim Fulton jim at zope.com
Sun Jan 22 18:17:22 UTC 2012


Log message for revision 124130:
  Handle workers that totally go away and come back.
  
  Updated main to reflect new connection direction.
  

Changed:
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.test

-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-01-22 18:17:19 UTC (rev 124129)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-01-22 18:17:21 UTC (rev 124130)
@@ -45,10 +45,15 @@
             if addr not in addrs:
                 workletts.pop(addr)
 
+    connect_sleep = 1.0
     def connect(self, addr, workletts):
         while addr in workletts:
-            socket = gevent.socket.create_connection(addr)
-            Worker(self.pool, socket, addr)
+            try:
+                socket = gevent.socket.create_connection(addr)
+                Worker(self.pool, socket, addr)
+            except Exception:
+                logger.exception('lb connecting to %r', addr)
+                gevent.sleep(self.connect_sleep)
 
     def handle_wsgi(self, env, start_response):
         rclass = self.classifier(env)
@@ -322,9 +327,10 @@
         args = sys.argv[1:]
 
     logging.basicConfig(level=logging.INFO)
-    wsgi_addr, lb_addr = map(parse_addr, args)
+    addrs = map(parse_addr, args)
+    wsgi_addr = addrs.pop(0)
 
-    lb = LB(lb_addr, host_classifier)
+    lb = LB(addrs, host_classifier)
     gevent.pywsgi.WSGIServer(wsgi_addr, lb.handle_wsgi).serve_forever()
 
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-01-22 18:17:19 UTC (rev 124129)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-01-22 18:17:21 UTC (rev 124130)
@@ -13,7 +13,7 @@
     >>> import gevent.server
     >>> class Worker:
     ...     def __init__(self):
-    ...         server = gevent.server.StreamServer(
+    ...         self.server = server = gevent.server.StreamServer(
     ...             ('127.0.0.1', 0), self.handle)
     ...         server.start()
     ...         self.addr = '127.0.0.1', server.server_port
@@ -297,6 +297,39 @@
     >>> len(lb.pool.workers)
     2
 
+In the test above, the worker was still listening the whole time.
+Let's Go a bit further.  We'll shut down the worker's server as well
+as the worker socket. When the lb gets an error, it sleaps a a second
+after failed attempts.  We can change this by setting a class variable
+that exists primarily for testing.  We'll change it on the lb.
+
+    >>> lb.connect_sleep = 0.01
+    >>> port = workers[0].server.server_port # We'll reuse below
+    >>> workers[0].server.kill()
+    >>> socket = workers[0].socket
+    >>> socket.close()
+    >>> gevent.sleep(.01)
+    >>> len(lb.pool.workers)
+    1
+
+OK, so we lost the worker and the lb didn't reconnect because the
+worker server is down:
+
+    >>> workers[0].socket is socket
+    True
+
+Now, we'll recreate the worker server and after a bit, the lb should reconnect:
+
+    >>> workers[0].server = gevent.server.StreamServer(
+    ...     ('127.0.0.1', port), workers[0].handle)
+    >>> workers[0].server.start()
+    >>> wait_until(lambda : workers[0].socket is not socket)
+    >>> write_message(workers[0].socket, 0, {'h3.com': 10.0})
+    >>> gevent.sleep(.01)
+    >>> len(lb.pool.workers)
+    2
+
+
 Adding and removing workers
 ===========================
 
@@ -346,3 +379,4 @@
 
 Typically, by the time we remove an address, the worker will already
 have gone away.
+



More information about the checkins mailing list