[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Fixed: trace log request ids weren't assigned correctly when using

jim cvs-admin at zope.org
Tue Mar 27 14:50:29 UTC 2012


Log message for revision 124749:
  Fixed: trace log request ids weren't assigned correctly when using
  multiple load balancers.
  
  Fixed a test race condition that caused spurious test failures.
  

Changed:
  U   zc.resumelb/trunk/src/zc/resumelb/tests.py
  U   zc.resumelb/trunk/src/zc/resumelb/worker.py
  U   zc.resumelb/trunk/src/zc/resumelb/worker.test

-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-03-26 22:08:08 UTC (rev 124748)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-03-27 14:50:25 UTC (rev 124749)
@@ -55,13 +55,22 @@
         content_length=12)
 
 @bobo.query('/sleep.html')
-def sleep(bobo_request, dur=0):
+def sleep(bobo_request, dur=0, size=1):
     time.sleep(float(dur))
     if 'tracelog' in bobo_request.environ:
         bobo_request.environ['tracelog'].log('test', 'T')
         bobo_request.environ['tracelog'].log('test2')
-    return 'hello world\n'
 
+    size = int(size)
+    if size > 1:
+        r = webob.Response()
+        r.app_iter = ('hello world\n' for i in range(size))
+        r.content_length = 12*size
+        r.content_type = 'text/html'
+        return r
+    else:
+        return 'hello world\n'
+
 @bobo.query('/gsleep.html')
 def gsleep(dur=0):
     gevent.sleep(float(dur))

Modified: zc.resumelb/trunk/src/zc/resumelb/worker.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/worker.py	2012-03-26 22:08:08 UTC (rev 124748)
+++ zc.resumelb/trunk/src/zc/resumelb/worker.py	2012-03-27 14:50:25 UTC (rev 124749)
@@ -55,7 +55,8 @@
         else:
             pool_apply = None
 
-        def call_app(rno, env):
+        self.trno = 0
+        def call_app(trno, env):
             response = [0]
             env['zc.resumelb.time'] = time.time()
             def start_response(status, headers, exc_info=None):
@@ -69,30 +70,30 @@
             no_message_format = '%s %s %s'
             message_format = '%s %s %s %s'
             now = datetime.datetime.now
-            def log(rno, code, message=None):
+            def log(trno, code, message=None):
                 if message:
-                    info(message_format, code, rno, now(), message)
+                    info(message_format, code, trno, now(), message)
                 else:
-                    info(no_message_format, code, rno, now())
+                    info(no_message_format, code, trno, now())
             tracelog = log
 
             class ApplicationTraceLog(object):
 
-                def __init__(self, rno):
-                    self.rno = rno
+                def __init__(self, trno):
+                    self.trno = trno
 
                 def log(self, msg=None, code='-'):
-                    log(self.rno, code, msg)
+                    log(self.trno, code, msg)
 
 
-            def call_app_w_tracelog(rno, env):
-                log(rno, 'C')
-                env[tracelog_key] = ApplicationTraceLog(rno)
-                response, body = call_app(rno, env)
+            def call_app_w_tracelog(trno, env):
+                log(trno, 'C')
+                env[tracelog_key] = ApplicationTraceLog(trno)
+                response, body = call_app(trno, env)
                 content_length = [v for (h, v) in response[1]
                                   if h.lower() == 'content-length']
                 content_length = content_length[-1] if content_length else '?'
-                log(rno, 'A', "%s %s" % (response[0], content_length))
+                log(trno, 'A', "%s %s" % (response[0], content_length))
                 def body_iter():
                     try:
                         for data in body:
@@ -100,18 +101,18 @@
                     finally:
                         if hasattr(body, 'close'):
                             body.close()
-                        log(rno, 'E')
+                        log(trno, 'E')
                 return response, body_iter()
 
             if threads:
-                def call_app_w_threads(rno, env):
-                    log(rno, 'I', env.get('CONTENT_LENGTH', 0))
-                    return pool_apply(call_app_w_tracelog, (rno, env))
+                def call_app_w_threads(trno, env):
+                    log(trno, 'I', env.get('CONTENT_LENGTH', 0))
+                    return pool_apply(call_app_w_tracelog, (trno, env))
                 self.call_app = call_app_w_threads
             else:
                 self.call_app = call_app_w_tracelog
         elif threads:
-            self.call_app = lambda rno, env: pool_apply(call_app, (rno, env))
+            self.call_app = lambda trno, env: pool_apply(call_app, (trno, env))
         else:
             self.call_app = call_app
 
@@ -183,11 +184,15 @@
     def handle(self, conn, rno, get, env):
         try:
             if self.tracelog:
+                self.trno += 1
+                trno = self.trno
                 query_string = env.get('QUERY_STRING')
                 url = env['PATH_INFO']
                 if query_string:
                     url += '?' + query_string
-                self.tracelog(rno, 'B', '%s %s' % (env['REQUEST_METHOD'], url))
+                self.tracelog(trno, 'B', '%s %s' % (env['REQUEST_METHOD'], url))
+            else:
+                trno = 0
 
             env['wsgi.errors'] = sys.stderr
 
@@ -205,7 +210,7 @@
             f.seek(0)
             env['wsgi.input'] = f
 
-            response, body = self.call_app(rno, env)
+            response, body = self.call_app(trno, env)
             try:
                 requests = conn.readers
                 if rno not in requests:

Modified: zc.resumelb/trunk/src/zc/resumelb/worker.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/worker.test	2012-03-26 22:08:08 UTC (rev 124748)
+++ zc.resumelb/trunk/src/zc/resumelb/worker.test	2012-03-27 14:50:25 UTC (rev 124749)
@@ -579,7 +579,7 @@
     >>> pprint(read_message(worker_socket))
     (0, {})
 
-By passing a threads object, we said we want a thread pool of size
+By passing a threads option, we said we want a thread pool of size
 one. This will serialize request processing.
 
 If a tracelog argument is passed a logger name, then trace logs will
@@ -593,39 +593,63 @@
 
 Now, we'll make some requests:
 
-    >>> write_message(worker_socket, 1, newenv('', '/sleep.html?dur=.1'))
+    >>> write_message(worker_socket, 11, newenv('', '/sleep.html?dur=.1'))
     >>> gevent.sleep(.01)
     B 1 2012-02-05 01:02:03.000456 GET /sleep.html?dur=.1
     >>> now += datetime.timedelta(microseconds=10000)
-    >>> write_message(worker_socket, 2, newenv('', '/sleep.html?dur=.1'))
+    >>> write_message(worker_socket, 22,
+    ...               newenv('', '/sleep.html?dur=.1&size=11'))
     >>> gevent.sleep(.01)
-    B 2 2012-02-05 01:02:03.010456 GET /sleep.html?dur=.1
+    B 2 2012-02-05 01:02:03.010456 GET /sleep.html?dur=.1&size=11
     >>> now += datetime.timedelta(microseconds=10000)
-    >>> write_message(worker_socket, 2, '')
+    >>> write_message(worker_socket, 22, '')
     >>> gevent.sleep(.01)
     I 2 2012-02-05 01:02:03.020456
     C 2 2012-02-05 01:02:03.020456
     >>> now += datetime.timedelta(microseconds=10000)
-    >>> write_message(worker_socket, 1, '')
+    >>> write_message(worker_socket, 11, '')
     >>> read_message(worker_socket) # doctest: +ELLIPSIS
     I 1 2012-02-05 01:02:03.030456
     T 2 2012-02-05 01:02:03.030456 test
     - 2 2012-02-05 01:02:03.030456 test2
-    A 2 2012-02-05 01:02:03.030456 200 OK 12
+    A 2 2012-02-05 01:02:03.030456 200 OK 132
     C 1 2012-02-05 01:02:03.030456
     E 2 2012-02-05 01:02:03.030456
-    (2, ('200 OK', [('Content-Type', ...
-    >>> read_message(worker_socket)
-    (2, 'hello world\n')
-    >>> read_message(worker_socket)
-    (2, '')
+    (22, ('200 OK', [('Content-Length', ...
+
+    >>> while 1:
+    ...     rno, data = read_message(worker_socket)
+    ...     if rno != 22:
+    ...         print 'oops', rno
+    ...         break
+    ...     if not data: break
+
     >>> read_message(worker_socket) # doctest: +ELLIPSIS
     T 1 2012-02-05 01:02:03.030456 test
     - 1 2012-02-05 01:02:03.030456 test2
     A 1 2012-02-05 01:02:03.030456 200 OK 12
     E 1 2012-02-05 01:02:03.030456
-    (1, ('200 OK', [...('Content-Length', '12')]))
+    (11, ('200 OK', [...('Content-Length', '12')]))
 
+Note that the tracelog request ids are distinct from the request
+numbers passed in the network protocol.  This is because network
+request numbers are assigned by load balancers and wouldn't be unique
+if there were multiple lbs.  Let's make a new connection, as we'll see
+this some more.
+
+    >>> now += datetime.timedelta(microseconds=10000)
+    >>> worker_socket2 = gevent.socket.create_connection(worker.addr)
+    >>> _ = read_message(worker_socket2)
+    >>> write_message(worker_socket2, 22, newenv('', '/hi.html'), '')
+    >>> read_message(worker_socket2) # doctest: +ELLIPSIS
+    B 3 2012-02-05 01:02:03.040456 GET /hi.html
+    I 3 2012-02-05 01:02:03.040456
+    C 3 2012-02-05 01:02:03.040456
+    A 3 2012-02-05 01:02:03.040456 200 OK 79
+    E 3 2012-02-05 01:02:03.040456
+    (22, ('200 OK', [('Content-Type', ...
+
+
 Also, note that the worker score is based on the time spent in app. It
 doesn't include time waiting.  We can see this when the worker sends
 us its resume:
@@ -643,6 +667,8 @@
 
     >>> logger.removeHandler(handler)
     >>> logger.setLevel(logging.NOTSET)
+    >>> worker_socket.close()
+    >>> worker_socket2.close()
     >>> worker.stop()
 
 Updating worker settings



More information about the checkins mailing list