[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Fixed: trace log request ids weren't assigned correctly when using
jim
cvs-admin at zope.org
Tue Mar 27 14:50:29 UTC 2012
Log message for revision 124749:
Fixed: trace log request ids weren't assigned correctly when using
multiple load balancers.
Fixed a test race condition that caused spurious test failures.
Changed:
U zc.resumelb/trunk/src/zc/resumelb/tests.py
U zc.resumelb/trunk/src/zc/resumelb/worker.py
U zc.resumelb/trunk/src/zc/resumelb/worker.test
-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py 2012-03-26 22:08:08 UTC (rev 124748)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py 2012-03-27 14:50:25 UTC (rev 124749)
@@ -55,13 +55,22 @@
content_length=12)
@bobo.query('/sleep.html')
-def sleep(bobo_request, dur=0):
+def sleep(bobo_request, dur=0, size=1):
time.sleep(float(dur))
if 'tracelog' in bobo_request.environ:
bobo_request.environ['tracelog'].log('test', 'T')
bobo_request.environ['tracelog'].log('test2')
- return 'hello world\n'
+ size = int(size)
+ if size > 1:
+ r = webob.Response()
+ r.app_iter = ('hello world\n' for i in range(size))
+ r.content_length = 12*size
+ r.content_type = 'text/html'
+ return r
+ else:
+ return 'hello world\n'
+
@bobo.query('/gsleep.html')
def gsleep(dur=0):
gevent.sleep(float(dur))
Modified: zc.resumelb/trunk/src/zc/resumelb/worker.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/worker.py 2012-03-26 22:08:08 UTC (rev 124748)
+++ zc.resumelb/trunk/src/zc/resumelb/worker.py 2012-03-27 14:50:25 UTC (rev 124749)
@@ -55,7 +55,8 @@
else:
pool_apply = None
- def call_app(rno, env):
+ self.trno = 0
+ def call_app(trno, env):
response = [0]
env['zc.resumelb.time'] = time.time()
def start_response(status, headers, exc_info=None):
@@ -69,30 +70,30 @@
no_message_format = '%s %s %s'
message_format = '%s %s %s %s'
now = datetime.datetime.now
- def log(rno, code, message=None):
+ def log(trno, code, message=None):
if message:
- info(message_format, code, rno, now(), message)
+ info(message_format, code, trno, now(), message)
else:
- info(no_message_format, code, rno, now())
+ info(no_message_format, code, trno, now())
tracelog = log
class ApplicationTraceLog(object):
- def __init__(self, rno):
- self.rno = rno
+ def __init__(self, trno):
+ self.trno = trno
def log(self, msg=None, code='-'):
- log(self.rno, code, msg)
+ log(self.trno, code, msg)
- def call_app_w_tracelog(rno, env):
- log(rno, 'C')
- env[tracelog_key] = ApplicationTraceLog(rno)
- response, body = call_app(rno, env)
+ def call_app_w_tracelog(trno, env):
+ log(trno, 'C')
+ env[tracelog_key] = ApplicationTraceLog(trno)
+ response, body = call_app(trno, env)
content_length = [v for (h, v) in response[1]
if h.lower() == 'content-length']
content_length = content_length[-1] if content_length else '?'
- log(rno, 'A', "%s %s" % (response[0], content_length))
+ log(trno, 'A', "%s %s" % (response[0], content_length))
def body_iter():
try:
for data in body:
@@ -100,18 +101,18 @@
finally:
if hasattr(body, 'close'):
body.close()
- log(rno, 'E')
+ log(trno, 'E')
return response, body_iter()
if threads:
- def call_app_w_threads(rno, env):
- log(rno, 'I', env.get('CONTENT_LENGTH', 0))
- return pool_apply(call_app_w_tracelog, (rno, env))
+ def call_app_w_threads(trno, env):
+ log(trno, 'I', env.get('CONTENT_LENGTH', 0))
+ return pool_apply(call_app_w_tracelog, (trno, env))
self.call_app = call_app_w_threads
else:
self.call_app = call_app_w_tracelog
elif threads:
- self.call_app = lambda rno, env: pool_apply(call_app, (rno, env))
+ self.call_app = lambda trno, env: pool_apply(call_app, (trno, env))
else:
self.call_app = call_app
@@ -183,11 +184,15 @@
def handle(self, conn, rno, get, env):
try:
if self.tracelog:
+ self.trno += 1
+ trno = self.trno
query_string = env.get('QUERY_STRING')
url = env['PATH_INFO']
if query_string:
url += '?' + query_string
- self.tracelog(rno, 'B', '%s %s' % (env['REQUEST_METHOD'], url))
+ self.tracelog(trno, 'B', '%s %s' % (env['REQUEST_METHOD'], url))
+ else:
+ trno = 0
env['wsgi.errors'] = sys.stderr
@@ -205,7 +210,7 @@
f.seek(0)
env['wsgi.input'] = f
- response, body = self.call_app(rno, env)
+ response, body = self.call_app(trno, env)
try:
requests = conn.readers
if rno not in requests:
Modified: zc.resumelb/trunk/src/zc/resumelb/worker.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/worker.test 2012-03-26 22:08:08 UTC (rev 124748)
+++ zc.resumelb/trunk/src/zc/resumelb/worker.test 2012-03-27 14:50:25 UTC (rev 124749)
@@ -579,7 +579,7 @@
>>> pprint(read_message(worker_socket))
(0, {})
-By passing a threads object, we said we want a thread pool of size
+By passing a threads option, we said we want a thread pool of size
one. This will serialize request processing.
If a tracelog argument is passed a logger name, then trace logs will
@@ -593,39 +593,63 @@
Now, we'll make some requests:
- >>> write_message(worker_socket, 1, newenv('', '/sleep.html?dur=.1'))
+ >>> write_message(worker_socket, 11, newenv('', '/sleep.html?dur=.1'))
>>> gevent.sleep(.01)
B 1 2012-02-05 01:02:03.000456 GET /sleep.html?dur=.1
>>> now += datetime.timedelta(microseconds=10000)
- >>> write_message(worker_socket, 2, newenv('', '/sleep.html?dur=.1'))
+ >>> write_message(worker_socket, 22,
+ ... newenv('', '/sleep.html?dur=.1&size=11'))
>>> gevent.sleep(.01)
- B 2 2012-02-05 01:02:03.010456 GET /sleep.html?dur=.1
+ B 2 2012-02-05 01:02:03.010456 GET /sleep.html?dur=.1&size=11
>>> now += datetime.timedelta(microseconds=10000)
- >>> write_message(worker_socket, 2, '')
+ >>> write_message(worker_socket, 22, '')
>>> gevent.sleep(.01)
I 2 2012-02-05 01:02:03.020456
C 2 2012-02-05 01:02:03.020456
>>> now += datetime.timedelta(microseconds=10000)
- >>> write_message(worker_socket, 1, '')
+ >>> write_message(worker_socket, 11, '')
>>> read_message(worker_socket) # doctest: +ELLIPSIS
I 1 2012-02-05 01:02:03.030456
T 2 2012-02-05 01:02:03.030456 test
- 2 2012-02-05 01:02:03.030456 test2
- A 2 2012-02-05 01:02:03.030456 200 OK 12
+ A 2 2012-02-05 01:02:03.030456 200 OK 132
C 1 2012-02-05 01:02:03.030456
E 2 2012-02-05 01:02:03.030456
- (2, ('200 OK', [('Content-Type', ...
- >>> read_message(worker_socket)
- (2, 'hello world\n')
- >>> read_message(worker_socket)
- (2, '')
+ (22, ('200 OK', [('Content-Length', ...
+
+ >>> while 1:
+ ... rno, data = read_message(worker_socket)
+ ... if rno != 22:
+ ... print 'oops', rno
+ ... break
+ ... if not data: break
+
>>> read_message(worker_socket) # doctest: +ELLIPSIS
T 1 2012-02-05 01:02:03.030456 test
- 1 2012-02-05 01:02:03.030456 test2
A 1 2012-02-05 01:02:03.030456 200 OK 12
E 1 2012-02-05 01:02:03.030456
- (1, ('200 OK', [...('Content-Length', '12')]))
+ (11, ('200 OK', [...('Content-Length', '12')]))
+Note that the tracelog request ids are distinct from the request
+numbers passed in the network protocol. This is because network
+request numbers are assigned by load balancers and wouldn't be unique
+if there were multiple lbs. Let's make a new connection, as we'll see
+this some more.
+
+ >>> now += datetime.timedelta(microseconds=10000)
+ >>> worker_socket2 = gevent.socket.create_connection(worker.addr)
+ >>> _ = read_message(worker_socket2)
+ >>> write_message(worker_socket2, 22, newenv('', '/hi.html'), '')
+ >>> read_message(worker_socket2) # doctest: +ELLIPSIS
+ B 3 2012-02-05 01:02:03.040456 GET /hi.html
+ I 3 2012-02-05 01:02:03.040456
+ C 3 2012-02-05 01:02:03.040456
+ A 3 2012-02-05 01:02:03.040456 200 OK 79
+ E 3 2012-02-05 01:02:03.040456
+ (22, ('200 OK', [('Content-Type', ...
+
+
Also, note that the worker score is based on the time spent in app. It
doesn't include time waiting. We can see this when the worker sends
us its resume:
@@ -643,6 +667,8 @@
>>> logger.removeHandler(handler)
>>> logger.setLevel(logging.NOTSET)
+ >>> worker_socket.close()
+ >>> worker_socket2.close()
>>> worker.stop()
Updating worker settings
More information about the checkins
mailing list