[Checkins] SVN: Sandbox/J1m/resumelb/s Added test for and debugged handling of workers disconnecting and
Jim Fulton
jim at zope.com
Wed Nov 9 12:07:06 UTC 2011
Log message for revision 123324:
Added test for and debugged handling of workers disconnecting and
resubmission of GET and HEAD requests.
Changed:
U Sandbox/J1m/resumelb/setup.py
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
-=-
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py 2011-11-09 10:29:50 UTC (rev 123323)
+++ Sandbox/J1m/resumelb/setup.py 2011-11-09 12:07:06 UTC (rev 123324)
@@ -13,9 +13,9 @@
##############################################################################
name, version = 'zc.resumelb', '0'
-install_requires = ['setuptools', 'gevent']
+install_requires = ['setuptools', 'gevent', 'WebOb']
extras_require = dict(
- test=['zope.testing', 'bobo', 'WebOb', 'manuel', 'WebTest'])
+ test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
entry_points = """
[console_scripts]
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2011-11-09 10:29:50 UTC (rev 123323)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2011-11-09 12:07:06 UTC (rev 123324)
@@ -5,16 +5,27 @@
import gevent.server
import logging
import sys
+import webob
import zc.resumelb.util
block_size = 1<<16
logger = logging.getLogger(__name__)
+retry_methods = set(('GET', 'HEAD'))
+
+default_disconnect_message = """
+The server was unable to handle your request due to a transient failure.
+Please try again.
+"""
+
class LB:
- def __init__(self, worker_addr, classifier):
+ def __init__(self, worker_addr, classifier,
+ disconnect_message=default_disconnect_message
+ ):
self.classifier = classifier
+ self.disconnect_message = disconnect_message
self.pool = Pool()
self.worker_server = gevent.server.StreamServer(
worker_addr, self.handle_worker)
@@ -35,11 +46,18 @@
except worker.Disconnected:
# XXX need to be more careful about whether
# start_response was called.
- if int(env.get(CONTENT_LENGTH, None)) == 0:
+ if (int(env.get('CONTENT_LENGTH', 0)) == 0 and
+ env.get('REQUEST_METHOD') in retry_methods
+ ):
logger.info("retrying %s", env)
else:
- raise
- finally:
+ return webob.Response(
+ status = '502 Bad Gateway',
+ content_type= 'text/html',
+ body = ("<html><body>%s</body></html>"
+ % self.disconnect_message)
+ )(env, start_response)
+ else:
self.pool.put(worker)
class Pool:
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test 2011-11-09 10:29:50 UTC (rev 123323)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test 2011-11-09 12:07:06 UTC (rev 123324)
@@ -170,5 +170,74 @@
>>> g2.value.status, g2.value.body == '2'*10000
('200 OK', True)
+Worker disconnection
+====================
+When a worker disconnects from a running lb, any pending GET or HEAD
+requests are resubmitted to another worker. All other requests
+generate a 500 response.
+ >>> greenlets = []
+ >>> for method in ('GET', 'HEAD', 'PUT', 'POST', 'DELETE',
+ ... 'OPTIONS', 'TRACE'):
+ ... app = webtest.TestApp(lb.handle_wsgi)
+ ... greenlets.append(
+ ... gevent.spawn(app.request, '/hi.html', method=method,
+ ... headers=[('Host', 'h1.com')], status='*'))
+ ... rno, data = read_message(worker1)
+ ... rno2, blank = read_message(worker1)
+ ... if rno2 != rno or blank != '':
+ ... print 'oops', (rno2, blank)
+ ... print rno, type(data)
+ 3 <type 'dict'>
+ 4 <type 'dict'>
+ 5 <type 'dict'>
+ 6 <type 'dict'>
+ 7 <type 'dict'>
+ 8 <type 'dict'>
+ 9 <type 'dict'>
+
+Now, we'll disconnect worker1:
+
+ >>> worker1.close()
+
+The GET and HEAD request will be send to worker2:
+
+ >>> rno1, env1 = read_message(worker2)
+ >>> rno, blank = read_message(worker2)
+ ... if rno != rno1 or blank != '':
+ ... print 'oops', (rno, blank)
+
+ >>> rno2, env2 = read_message(worker2)
+ >>> rno, blank = read_message(worker2)
+ ... if rno != rno2 or blank != '':
+ ... print 'oops', (rno, blank)
+
+ >>> sorted((env1['REQUEST_METHOD'], env2['REQUEST_METHOD']))
+ ['GET', 'HEAD']
+
+ >>> response = webob.Response('Hello test\n')
+ >>> for (rno, env) in ((rno1, env1), (rno2, env2)):
+ ... write_message(worker2, rno,
+ ... (response.status, response.headers.items()))
+ ... if env['REQUEST_METHOD'] == 'GET':
+ ... write_message(worker2, rno, response.body)
+ ... write_message(worker2, rno, '')
+
+ >>> for g in greenlets:
+ ... g.join()
+ ... print repr(g.value)
+ <200 OK text/html body='Hello test\n'>
+ <200 OK text/html no body>
+ <502 Bad Gateway text/html body='<html><bo...tml>'/118>
+ <502 Bad Gateway text/html body='<html><bo...tml>'/118>
+ <502 Bad Gateway text/html body='<html><bo...tml>'/118>
+ <502 Bad Gateway text/html body='<html><bo...tml>'/118>
+ <502 Bad Gateway text/html body='<html><bo...tml>'/118>
+
+ >>> print greenlets[2].value.body
+ <html><body>
+ The server was unable to handle your request due to a transient failure.
+ Please try again.
+ </body></html>
+
More information about the checkins
mailing list