[Checkins] SVN: Sandbox/J1m/resumelb/ Added tracelog support.

Jim Fulton jim at zope.com
Sun Feb 5 20:58:11 UTC 2012


Log message for revision 124307:
  Added tracelog support.
  
  Also separated worker config into static config and run-time config.
  
  Made threads and tracelog part of static config, however, tracelogs
  seem to be rather expensive, at least when writing to disk on mac os
  x, so maybe shoudl be dynamic config. OTOH, should try other tracelog
  configs, like maybe UDP.
  

Changed:
  U   Sandbox/J1m/resumelb/buildout.cfg
  U   Sandbox/J1m/resumelb/setup.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/zk.py

-=-
Modified: Sandbox/J1m/resumelb/buildout.cfg
===================================================================
--- Sandbox/J1m/resumelb/buildout.cfg	2012-02-04 22:32:04 UTC (rev 124306)
+++ Sandbox/J1m/resumelb/buildout.cfg	2012-02-05 20:58:09 UTC (rev 124307)
@@ -37,6 +37,8 @@
   [server:main]
   use = egg:zc.resumelb
   address = ${lb:worker_addr}
+  threads = 1
+  tracelog = tracelog
 
 [worker]
 recipe = zc.zdaemonrecipe

Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py	2012-02-04 22:32:04 UTC (rev 124306)
+++ Sandbox/J1m/resumelb/setup.py	2012-02-05 20:58:09 UTC (rev 124307)
@@ -17,7 +17,7 @@
     'setuptools', 'gevent >=1.0b1', 'WebOb', 'zc.thread', 'zc.parse_addr',
     'zc.mappingobject', 'llist']
 extras_require = dict(
-    test=['zope.testing', 'bobo', 'manuel', 'WebTest', 'zc.zk',
+    test=['zope.testing', 'bobo', 'manuel', 'WebTest', 'zc.zk [test]',
           'ZConfig', 'mock'])
 
 entry_points = """

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2012-02-04 22:32:04 UTC (rev 124306)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2012-02-05 20:58:09 UTC (rev 124307)
@@ -50,13 +50,18 @@
         app_iter=['', 'hello world\n'],
         content_length=12)
 
-
-
 @bobo.query('/sleep.html')
-def sleep(dur=0):
+def sleep(bobo_request, dur=0):
     time.sleep(float(dur))
+    if 'tracelog' in bobo_request.environ:
+        bobo_request.environ['tracelog']('T', 'test')
     return 'hello world\n'
 
+ at bobo.query('/gsleep.html')
+def gsleep(dur=0):
+    gevent.sleep(float(dur))
+    return 'hello world\n'
+
 def app():
     return bobo.Application(bobo_resources=__name__)
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-02-04 22:32:04 UTC (rev 124306)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-02-05 20:58:09 UTC (rev 124307)
@@ -1,4 +1,5 @@
 import cStringIO
+import datetime
 import errno
 import gevent
 import gevent.hub
@@ -24,7 +25,8 @@
 
 class Worker:
 
-    def __init__(self, app, addr, settings, resume_file=None):
+    def __init__(self, app, addr, settings,
+                 resume_file=None, threads=None, tracelog=None):
         self.app = app
         self.settings = zc.mappingobject.mappingobject(settings)
         self.worker_request_number = 0
@@ -40,18 +42,74 @@
         self.time_ring_pos = 0
         self.connections = set()
 
-        if settings.get('threads'):
-            pool = gevent.threadpool.ThreadPool(settings['threads'])
-            self.apply = pool.apply
+        if threads:
+            self.threadpool = gevent.threadpool.ThreadPool(threads)
+            pool_apply = self.threadpool.apply
         else:
-            self.apply = apply
+            pool_apply = None
 
+        def call_app(rno, env):
+            response = [0]
+            def start_response(status, headers, exc_info=None):
+                assert not exc_info # XXX
+                response[0] = (status, headers)
+            body = app(env, start_response)
+            return response[0], body
+
+        if tracelog:
+            info = logging.getLogger(tracelog).info
+            no_message_format = '%s %s %s'
+            message_format = '%s %s %s %s'
+            now = datetime.datetime.now
+            def log(rno, code, message=None):
+                if message:
+                    info(message_format, code, rno, now(), message)
+                else:
+                    info(no_message_format, code, rno, now())
+            tracelog = log
+
+            def call_app_w_tracelog(rno, env):
+                log(rno, 'C')
+                env['tracelog'] = (
+                    lambda code, message=None: log(rno, code, message)
+                    )
+                response, body = call_app(rno, env)
+                content_length = [v for (h, v) in response[1]
+                                  if h.lower() == 'content-length']
+                content_length = content_length[-1] if content_length else '?'
+                log(rno, 'A', "%s %s" % (response[0], content_length))
+                def body_iter():
+                    try:
+                        for data in body:
+                            yield data
+                    finally:
+                        if hasattr(body, 'close'):
+                            body.close()
+                        log(rno, 'E')
+                return response, body_iter()
+
+            if threads:
+                def call_app_w_threads(rno, env):
+                    log(rno, 'I', env.get('CONTENT_LENGTH', 0))
+                    return pool_apply(call_app_w_tracelog, (rno, env))
+                self.call_app = call_app_w_threads
+            else:
+                self.call_app = call_app_w_tracelog
+        elif threads:
+            self.call_app = lambda rno, env: pool_apply(call_app, (rno, env))
+        else:
+            self.call_app = call_app
+
+        self.tracelog = tracelog
+
         self.server = gevent.server.StreamServer(addr, self.handle_connection)
         self.server.start()
         self.addr = addr[0], self.server.server_port
 
     def stop(self):
         self.server.stop()
+        if hasattr(self, 'threadpool'):
+            self.threadpool.kill()
 
     def handle_connection(self, sock, addr):
         try:
@@ -84,11 +142,17 @@
 
     def handle(self, conn, rno, get, env):
         try:
-            f = cStringIO.StringIO()
-            env['wsgi.input'] = f
+            if self.tracelog:
+                query_string = env.get('QUERY_STRING')
+                url = env['PATH_INFO']
+                if query_string:
+                    url += '?' + query_string
+                self.tracelog(rno, 'B', '%s %s' % (env['REQUEST_METHOD'], url))
+
             env['wsgi.errors'] = sys.stderr
 
-            # XXX We're buffering input.  It maybe should to have option not to.
+            # XXX We're buffering input. Maybe should to have option not to.
+            f = cStringIO.StringIO()
             while 1:
                 data = get()
                 if data:
@@ -99,18 +163,14 @@
                 else:
                     break
             f.seek(0)
+            env['wsgi.input'] = f
 
-            response = [0]
-            def start_response(status, headers, exc_info=None):
-                assert not exc_info # XXX
-                response[0] = (status, headers)
-
+            response, body = self.call_app(rno, env)
             try:
                 requests = conn.readers
-                body = self.apply(self.app, (env, start_response))
                 if rno not in requests:
                     return # cancelled
-                conn.put((rno, response[0]))
+                conn.put((rno, response))
                 for data in body:
                     if rno not in requests:
                         return # cancelled
@@ -147,6 +207,9 @@
 
             except zc.resumelb.util.Disconnected:
                 return # whatever
+            finally:
+                if hasattr(body, 'close'):
+                    body.close()
         except:
             error('handle_connection')
         finally:
@@ -170,10 +233,11 @@
                     pass
 
 
-def server_runner(app, global_conf, address, history=500, threads=1):
+def server_runner(app, global_conf, address, history=500, threads=0, **kw):
     # paste deploy hook
     logging.basicConfig(level=logging.INFO)
     host, port = address.split(':')
-    Worker(app, (host, int(port)), dict(history=history, threads=threads)
-           ).server.serve_forever()
+    Worker(app, (host, int(port)), dict(history=history),
+           threads=threads and int(threads),
+           **kw).server.serve_forever()
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.test	2012-02-04 22:32:04 UTC (rev 124306)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.test	2012-02-05 20:58:09 UTC (rev 124307)
@@ -486,7 +486,76 @@
 marker.
 
 cleanup
---------------------------------------------------------------------
 
+    >>> worker.stop()
 
+Tracelog and thread-pool support
+--------------------------------
+
+Workers support tracelogs and thread pools.
+
+We're gonna be tricky and fake time, taking advantage of the knowledge
+that the worker constructor caches datetime.now when writing to trace logs:
+
+    >>> import datetime, mock
+    >>> now = datetime.datetime(2012, 2, 5, 1, 2, 3, 456)
+    >>> with mock.patch('datetime.datetime') as dtmock:
+    ...     dtmock.now.side_effect = lambda : now
+    ...     worker = zc.resumelb.worker.Worker(
+    ...       zc.resumelb.tests.app(), ('127.0.0.1', 0), dict(history=5),
+    ...       tracelog='tracelog', threads=1)
+
+    >>> worker_socket = gevent.socket.create_connection(worker.addr)
+    >>> pprint(read_message(worker_socket))
+    (0, {})
+
+By passing a threads object, we said we want a thread pool of size
+one. This will serialize request processing.
+
+If a tracelog argument is passed a logger name, then trace logs will
+be generated to the logger name.
+
+    >>> import logging, sys
+    >>> logger = logging.getLogger('tracelog')
+    >>> handler = logging.StreamHandler(sys.stdout)
+    >>> logger.addHandler(handler)
+    >>> logger.setLevel(logging.INFO)
+
+Now, we'll make some requests:
+
+    >>> write_message(worker_socket, 1, newenv('', '/sleep.html?dur=.1'))
+    >>> gevent.sleep(.01)
+    B 1 2012-02-05 01:02:03.000456 GET /sleep.html?dur=.1
+    >>> now += datetime.timedelta(microseconds=10000)
+    >>> write_message(worker_socket, 2, newenv('', '/sleep.html?dur=.1'))
+    >>> gevent.sleep(.01)
+    B 2 2012-02-05 01:02:03.010456 GET /sleep.html?dur=.1
+    >>> now += datetime.timedelta(microseconds=10000)
+    >>> write_message(worker_socket, 2, '')
+    >>> gevent.sleep(.01)
+    I 2 2012-02-05 01:02:03.020456
+    C 2 2012-02-05 01:02:03.020456
+    >>> now += datetime.timedelta(microseconds=10000)
+    >>> write_message(worker_socket, 1, '')
+    >>> read_message(worker_socket) # doctest: +ELLIPSIS
+    I 1 2012-02-05 01:02:03.030456
+    T 2 2012-02-05 01:02:03.030456 test
+    A 2 2012-02-05 01:02:03.030456 200 OK 12
+    C 1 2012-02-05 01:02:03.030456
+    E 2 2012-02-05 01:02:03.030456
+    (2, ('200 OK', [('Content-Type', ...
+    >>> read_message(worker_socket)
+    (2, 'hello world\n')
+    >>> read_message(worker_socket)
+    (2, '')
+    >>> read_message(worker_socket) # doctest: +ELLIPSIS
+    T 1 2012-02-05 01:02:03.030456 test
+    A 1 2012-02-05 01:02:03.030456 200 OK 12
+    E 1 2012-02-05 01:02:03.030456
+    (1, ('200 OK', [...('Content-Length', '12')]))
+
+Cleanup:
+
+    >>> logger.removeHandler(handler)
+    >>> logger.setLevel(logging.NOTSET)
     >>> worker.stop()

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.py	2012-02-04 22:32:04 UTC (rev 124306)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.py	2012-02-05 20:58:09 UTC (rev 124307)
@@ -22,8 +22,7 @@
 import zc.zk
 
 def worker(app, global_conf, zookeeper, path, loggers=None, address=':0',
-           resume_file=None,
-           run=True):
+           threads=None, run=True, **kw):
     """Paste deploy server runner
     """
     if loggers:
@@ -33,7 +32,9 @@
     zk = zc.zk.ZooKeeper(zookeeper)
     address = zc.parse_addr.parse_addr(address)
     from zc.resumelb.worker import Worker
-    worker = Worker(app, address, zk.properties(path), resume_file)
+    worker = Worker(app, address, zk.properties(path),
+                    threads=threads and int(threads),
+                    **kw)
     zk.register_server(path+'/providers', worker.addr)
     worker.zk = zk
     if run:



More information about the checkins mailing list