[Checkins] SVN: Sandbox/J1m/resumelb/s Added test for and debugged handling of workers disconnecting and

Jim Fulton jim at zope.com
Wed Nov 9 12:07:06 UTC 2011


Log message for revision 123324:
  Added test for and debugged handling of workers disconnecting and
  resubmission of GET and HEAD requests.
  

Changed:
  U   Sandbox/J1m/resumelb/setup.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.test

-=-
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py	2011-11-09 10:29:50 UTC (rev 123323)
+++ Sandbox/J1m/resumelb/setup.py	2011-11-09 12:07:06 UTC (rev 123324)
@@ -13,9 +13,9 @@
 ##############################################################################
 name, version = 'zc.resumelb', '0'
 
-install_requires = ['setuptools', 'gevent']
+install_requires = ['setuptools', 'gevent', 'WebOb']
 extras_require = dict(
-    test=['zope.testing', 'bobo', 'WebOb', 'manuel', 'WebTest'])
+    test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
 
 entry_points = """
 [console_scripts]

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2011-11-09 10:29:50 UTC (rev 123323)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2011-11-09 12:07:06 UTC (rev 123324)
@@ -5,16 +5,27 @@
 import gevent.server
 import logging
 import sys
+import webob
 import zc.resumelb.util
 
 block_size = 1<<16
 
 logger = logging.getLogger(__name__)
 
+retry_methods = set(('GET', 'HEAD'))
+
+default_disconnect_message = """
+The server was unable to handle your request due to a transient failure.
+Please try again.
+"""
+
 class LB:
 
-    def __init__(self, worker_addr, classifier):
+    def __init__(self, worker_addr, classifier,
+                 disconnect_message=default_disconnect_message
+                 ):
         self.classifier = classifier
+        self.disconnect_message = disconnect_message
         self.pool = Pool()
         self.worker_server = gevent.server.StreamServer(
             worker_addr, self.handle_worker)
@@ -35,11 +46,18 @@
             except worker.Disconnected:
                 # XXX need to be more careful about whether
                 # start_response was called.
-                if int(env.get(CONTENT_LENGTH, None)) == 0:
+                if (int(env.get('CONTENT_LENGTH', 0)) == 0 and
+                    env.get('REQUEST_METHOD') in retry_methods
+                    ):
                     logger.info("retrying %s", env)
                 else:
-                    raise
-            finally:
+                    return webob.Response(
+                        status = '502 Bad Gateway',
+                        content_type= 'text/html',
+                        body = ("<html><body>%s</body></html>"
+                                % self.disconnect_message)
+                        )(env, start_response)
+            else:
                 self.pool.put(worker)
 
 class Pool:

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2011-11-09 10:29:50 UTC (rev 123323)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2011-11-09 12:07:06 UTC (rev 123324)
@@ -170,5 +170,74 @@
     >>> g2.value.status, g2.value.body == '2'*10000
     ('200 OK', True)
 
+Worker disconnection
+====================
 
+When a worker disconnects from a running lb, any pending GET or HEAD
+requests are resubmitted to another worker. All other requests
+generate a 500 response.
 
+    >>> greenlets = []
+    >>> for method in ('GET', 'HEAD', 'PUT', 'POST', 'DELETE',
+    ...                'OPTIONS', 'TRACE'):
+    ...     app = webtest.TestApp(lb.handle_wsgi)
+    ...     greenlets.append(
+    ...         gevent.spawn(app.request, '/hi.html', method=method,
+    ...                      headers=[('Host', 'h1.com')], status='*'))
+    ...     rno, data = read_message(worker1)
+    ...     rno2, blank = read_message(worker1)
+    ...     if rno2 != rno or blank != '':
+    ...         print 'oops', (rno2, blank)
+    ...     print rno, type(data)
+    3 <type 'dict'>
+    4 <type 'dict'>
+    5 <type 'dict'>
+    6 <type 'dict'>
+    7 <type 'dict'>
+    8 <type 'dict'>
+    9 <type 'dict'>
+
+Now, we'll disconnect worker1:
+
+    >>> worker1.close()
+
+The GET and HEAD request will be send to worker2:
+
+    >>> rno1, env1 = read_message(worker2)
+    >>> rno, blank = read_message(worker2)
+    ... if rno != rno1 or blank != '':
+    ...     print 'oops', (rno, blank)
+
+    >>> rno2, env2 = read_message(worker2)
+    >>> rno, blank = read_message(worker2)
+    ... if rno != rno2 or blank != '':
+    ...     print 'oops', (rno, blank)
+
+    >>> sorted((env1['REQUEST_METHOD'], env2['REQUEST_METHOD']))
+    ['GET', 'HEAD']
+
+    >>> response = webob.Response('Hello test\n')
+    >>> for (rno, env) in ((rno1, env1), (rno2, env2)):
+    ...     write_message(worker2, rno,
+    ...                   (response.status, response.headers.items()))
+    ...     if env['REQUEST_METHOD'] == 'GET':
+    ...         write_message(worker2, rno, response.body)
+    ...     write_message(worker2, rno, '')
+
+    >>> for g in greenlets:
+    ...    g.join()
+    ...    print repr(g.value)
+    <200 OK text/html body='Hello test\n'>
+    <200 OK text/html no body>
+    <502 Bad Gateway text/html body='<html><bo...tml>'/118>
+    <502 Bad Gateway text/html body='<html><bo...tml>'/118>
+    <502 Bad Gateway text/html body='<html><bo...tml>'/118>
+    <502 Bad Gateway text/html body='<html><bo...tml>'/118>
+    <502 Bad Gateway text/html body='<html><bo...tml>'/118>
+
+    >>> print greenlets[2].value.body
+    <html><body>
+    The server was unable to handle your request due to a transient failure.
+    Please try again.
+    </body></html>
+



More information about the checkins mailing list