[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Added graceful load-balancer shutdown on SIGTERM.

jim cvs-admin at zope.org
Thu Mar 22 16:51:32 UTC 2012


Log message for revision 124693:
  Added graceful load-balancer shutdown on SIGTERM.
  

Changed:
  U   zc.resumelb/trunk/src/zc/resumelb/README.txt
  U   zc.resumelb/trunk/src/zc/resumelb/lb.py
  U   zc.resumelb/trunk/src/zc/resumelb/lb.test
  U   zc.resumelb/trunk/src/zc/resumelb/zk.py
  U   zc.resumelb/trunk/src/zc/resumelb/zk.test

-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/README.txt
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/README.txt	2012-03-22 14:12:28 UTC (rev 124692)
+++ zc.resumelb/trunk/src/zc/resumelb/README.txt	2012-03-22 16:51:27 UTC (rev 124693)
@@ -243,7 +243,7 @@
 0.1.1 (2012-03-??)
 ------------------
 
-- Added graceful worker shutdown on SIGTERM.
+- Added graceful load-balancer and worker shutdown on SIGTERM.
 
 - Updated the API for application trace logging to match that of
   zc.zservertracelog, mainly to get database logging for ZTK

Modified: zc.resumelb/trunk/src/zc/resumelb/lb.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-03-22 14:12:28 UTC (rev 124692)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-03-22 16:51:27 UTC (rev 124693)
@@ -54,7 +54,13 @@
             try:
                 socket = gevent.socket.create_connection(addr)
                 Worker(self.pool, socket, addr)
-            except Exception:
+            except gevent.GreenletExit, v:
+                try:
+                    socket.close()
+                except:
+                    pass
+                raise
+            except Exception, v:
                 logger.exception('lb connecting to %r', addr)
                 gevent.sleep(self.connect_sleep)
 
@@ -63,6 +69,11 @@
             g.kill()
         self.workletts.clear()
 
+    def shutdown(self):
+        while self.pool.backlog:
+            gevent.sleep(.01)
+        self.stop()
+
     def handle_wsgi(self, env, start_response):
         rclass = self.classifier(env)
         logger.debug('wsgi: %s', rclass)
@@ -173,10 +184,16 @@
             self.unskilled.remove(worker.lnode)
             worker.lnode = None
         self.workers.remove(worker)
+
+        self.backlog -= worker.backlog
+        assert self.backlog >= 0, self.backlog
+        _decay_backlog(self, self.decay)
+
         self.nworkers = len(self.workers)
         if self.nworkers:
             self._update_decay()
         else:
+            assert self.backlog == 0, self.backlog
             self.event.clear()
 
     def get(self, rclass, timeout=None):

Modified: zc.resumelb/trunk/src/zc/resumelb/lb.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.test	2012-03-22 14:12:28 UTC (rev 124692)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.test	2012-03-22 16:51:27 UTC (rev 124693)
@@ -316,6 +316,12 @@
     </body></html>
     <BLANKLINE>
 
+Note that at this point, there shouldn't be any in-flight requests and
+the backlog should be 0:
+
+    >>> lb.pool.backlog
+    0
+
 Automatic reconnection
 ======================
 
@@ -421,12 +427,59 @@
 Typically, by the time we remove an address, the worker will already
 have gone away.
 
-----------------------------------------------------------
+Graceful shutdown
+=================
 
-Cleanup:
+Load balancers have a shutdown method that:
 
-    >>> lb.stop()
+- stops accepting web connections
+- Waits for requests to be worked off
+- disconnects from workers
 
+We already have a request in flight.  Let's add another on a different
+worker:
+
+    >>> g2 = gevent.spawn(app2.get, '/hi.html', {}, [('Host', 'h2.com')])
+
+    >>> gevent.sleep(.01)
+    >>> [w.backlog for w in lb.pool.workers]
+    [1, 1]
+
+If we call shutdown, it will block until we have no in-flight
+connections, so we'll call it in a greenlet:
+
+    >>> shutdown_greenlet = gevent.spawn(lb.shutdown)
+    >>> gevent.sleep(.01)
+    >>> shutdown_greenlet.ready()
+    False
+
+Now, let's finish the outstanding requests:
+
+    >>> write_message(workers[-1].socket, rno,
+    ...      (response.status, response.headers.items()))
+    >>> write_message(workers[-1].socket, rno, response.body)
+    >>> write_message(workers[-1].socket, rno, '')
+    >>> g.join()
+
+    >>> rno, env = read_message(workers[0].socket)
+    >>> read_message(workers[0].socket)
+    (4, '')
+
+    >>> write_message(workers[0].socket, rno,
+    ...      (response.status, response.headers.items()))
+    >>> write_message(workers[0].socket, rno, response.body)
+    >>> write_message(workers[0].socket, rno, '')
+    >>> g2.join()
+
+    >>> gevent.sleep(.01)
+    >>> shutdown_greenlet.ready()
+    True
+
+At this point, the worker sockets are closed:
+
+    >>> [w.socket.recv(1) for w in workers]
+    ['', '']
+
 Built-in request classifiers
 ============================
 

Modified: zc.resumelb/trunk/src/zc/resumelb/zk.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/zk.py	2012-03-22 14:12:28 UTC (rev 124692)
+++ zc.resumelb/trunk/src/zc/resumelb/zk.py	2012-03-22 16:51:27 UTC (rev 124693)
@@ -209,6 +209,13 @@
     zk.register_server(path+'/providers', (addr[0], server.server_port),
                        **registration_data)
 
+    def shutdown():
+        zk.close()
+        server.close()
+        lb.shutdown()
+
+    gevent.signal(signal.SIGTERM, shutdown)
+
     if run:
         try:
             server.serve_forever()

Modified: zc.resumelb/trunk/src/zc/resumelb/zk.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/zk.test	2012-03-22 14:12:28 UTC (rev 124692)
+++ zc.resumelb/trunk/src/zc/resumelb/zk.test	2012-03-22 16:51:27 UTC (rev 124693)
@@ -166,7 +166,6 @@
     ...     if configureLoggers.called: print 'configureLoggers'
     ...     basicConfig.assert_called_with(level=42)
 
-
 LB
 ==
 
@@ -210,9 +209,13 @@
 
 Let's start with a simple call:
 
+
+    >>> gevent.signal.reset_mock()
     >>> lb, server, accesslog = zc.resumelb.zk.lbmain(
     ...     'zookeeper.example.com:2181 /test/lb')
 
+    >>> sig, sighandler = gevent.signal.call_args[0]
+
     >>> import sys
     >>> accesslog is sys.stdout
     True
@@ -265,13 +268,35 @@
     >>> len(lb.pool.workers)
     2
 
+Shutdown
+--------
+
+A shutdown signal handler is registered.  We can call it to shut the
+worker down:
+
+    >>> sig == signal.SIGTERM
+    True
+
+    >>> sighandler()
+    >>> gevent.sleep(.01)
+    >>> zk.get_children('/test/lb/providers')
+    []
+
+    >>> gevent.socket.create_connection(('127.0.0.1', addr[1]))
+    Traceback (most recent call last):
+    ...
+    error: [Errno 111] Connection refused
+
+    >>> lb.workletts
+    {}
+
+
+Variations
+-----------
+
 OK, so let's try a more complex example.  Maybe we can exercise all of
 the options!
 
-    >>> server.stop()
-    >>> lb.stop()
-    >>> lb.zk.close()
-
     >>> with open('log.conf', 'w') as f:
     ...   f.write('loggers')
 



More information about the checkins mailing list