[Checkins] SVN: Sandbox/J1m/resumelb/s Added worker tests.

Jim Fulton jim at zope.com
Mon Oct 31 10:56:27 UTC 2011


Log message for revision 123194:
  Added worker tests.
  

Changed:
  U   Sandbox/J1m/resumelb/setup.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/worker.test

-=-
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py	2011-10-31 10:56:24 UTC (rev 123193)
+++ Sandbox/J1m/resumelb/setup.py	2011-10-31 10:56:27 UTC (rev 123194)
@@ -14,7 +14,7 @@
 name, version = 'zc.resumelb', '0'
 
 install_requires = ['setuptools', 'gevent']
-extras_require = dict(test=['zope.testing', 'bobo'])
+extras_require = dict(test=['zope.testing', 'bobo', 'WebOb', 'manuel'])
 
 entry_points = """
 [console_scripts]

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2011-10-31 10:56:24 UTC (rev 123193)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2011-10-31 10:56:27 UTC (rev 123194)
@@ -1,8 +1,59 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
 import bobo
+import doctest
+import gevent
+import manuel.capture
+import manuel.doctest
+import manuel.testing
 import os
+import time
+import unittest
+import webob
 
- at bobo.query
-def hi(bobo_request):
-    return "\n\n%s -> %s\n\n" % (
-        bobo_request.url, os.getpid())
+pid = os.getpid()
 
+ at bobo.resource
+def hi(request):
+    body = request.environ['wsgi.input'].read()
+    return "\n\n%s -> %s %s %s\n\n" % (
+        request.url, pid, len(body), hash(body))
+
+ at bobo.query('/gen.html')
+def gen(size=0):
+    size = int(size)
+    return webob.Response(
+        app_iter=['hello world\n'*1000]*size,
+        content_length=12000*size)
+
+ at bobo.query('/sleep.html')
+def sleep(dur=0):
+    time.sleep(float(dur))
+    return 'hello world\n'
+
+def app():
+    return bobo.Application(bobo_resources=__name__)
+
+def setUp(test):
+    global pid
+    pid = 6115
+
+def test_suite():
+    return unittest.TestSuite((
+        manuel.testing.TestSuite(
+            manuel.doctest.Manuel() + manuel.capture.Manuel(),
+            'worker.test',
+            setUp=setUp),
+        ))
+

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2011-10-31 10:56:24 UTC (rev 123193)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2011-10-31 10:56:27 UTC (rev 123194)
@@ -31,6 +31,7 @@
     def connect(self, addr):
         socket = gevent.socket.create_connection(addr)
         readers = self.connected(socket)
+        self.put((0, self.resume))
 
         while self.connected:
             try:
@@ -44,7 +45,7 @@
                 env = data
                 env['zc.resumelb.time'] = time.time()
                 env['zc.resumelb.lb_addr'] = self.addr
-                gevent.Greenlet.spawn(self.handle, rno, self.start(rno), env)
+                gevent.spawn(self.handle, rno, self.start(rno), env)
             else:
                 rput(data)
 
@@ -64,6 +65,7 @@
                 return
             else:
                 break
+        f.seek(0)
 
         def start_response(status, headers, exc_info=None):
             assert not exc_info # XXX
@@ -74,8 +76,9 @@
                 self.put((rno, data))
 
             self.put((rno, ''))
+            self.readers.pop(rno)
 
-            elapsed = time.time() - env['zc.resumelb.time']
+            elapsed = max(time.time() - env['zc.resumelb.time'], 1e-9)
             time_ring = self.time_ring
             time_ring_pos = rno % self.time_ring_size
             rclass = env['zc.resumelb.request_class']
@@ -107,5 +110,4 @@
     logging.basicConfig(level=logging.INFO)
     host, port = lb.split(':')
     Worker(app, (host, int(port)), history)
-    gevent.hub.get_hub().switch()
 

Added: Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.test	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.test	2011-10-31 10:56:27 UTC (rev 123194)
@@ -0,0 +1,222 @@
+Workers act as wsgi servers, but rather than listenting for HTTP
+requests, they connect to a resumelb to request work.
+
+To test the worker, we'll start a testing lb that let's us control
+data going to and from the worker.
+
+    >>> import gevent.server, gevent.event
+    >>> worker_socket_result = gevent.event.AsyncResult()
+    >>> def handle(sock, addr):
+    ...     worker_socket_result.set(sock)
+
+    >>> server = gevent.server.StreamServer(('127.0.0.1', 0), handle)
+    >>> server.start()
+
+Now, we can start a worker:
+
+    >>> import zc.resumelb.worker, zc.resumelb.tests
+    >>> worker = gevent.spawn(
+    ...   zc.resumelb.worker.Worker,
+    ...   zc.resumelb.tests.app(),
+    ...   ('127.0.0.1', server.server_port),
+    ...   history=5)
+
+Here we created a worker using a test application, telling it to
+connect to our server address and to update it's resume after every
+five requests.
+
+Now, wait for the worker to connect:
+
+    >>> worker_socket = worker_socket_result.get()
+
+Workers and the lb communicate via sized messages.
+
+Each message consists of binary request numbers, data size and a
+marshalled data string.  Helper functions help us read and write
+messages.  When workers connect, they send their resume and then wait
+for work to do.  Because our worker has no experience :), it's resume
+is empty:
+
+    >>> from zc.resumelb.util import read_message, write_message
+    >>> read_message(worker_socket)
+    (0, {})
+
+When the worker sends it's resume, it sends 0 as the request number.
+
+Now, let's send a request to the worker.  Requests are based on wsgi
+environments. We'll use webob to help us set this up.
+
+    >>> import webob
+    >>> def newenv(rclass, *a, **kw):
+    ...     r = webob.Request.blank(*a, **kw)
+    ...     env = r.environ.copy()
+    ...     inp = env.pop('wsgi.input')
+    ...     del env['wsgi.errors']
+    ...     env['zc.resumelb.request_class'] = rclass
+    ...     return env, inp
+
+    >>> env, _ = newenv('', '/hi.html')
+
+The newenv helper:
+
+- Creates a request enviton
+- without input or error streams
+- with a passed request class. The request class is needed for the resume.
+
+    >>> write_message(worker_socket, 1, env, '')
+
+write_message takes a socket to write to, a request number and one or
+more data objects.  Here, we passed 2 data objects, the request
+environment and an empty string indicating the end of the (empty)
+request body.
+
+The worker will process the request and send back 3 records:
+
+- response status and headers,
+- response body, and
+- an empty end-of-body message.
+
+    >>> def print_response(worker_socket, rno, size_only=False):
+    ...     rn, (status, headers) = read_message(worker_socket)
+    ...     if rn != rno:
+    ...        raise AssertionError("Bad request numbers", rno, rn)
+    ...     print rno, status
+    ...     for h in sorted(headers):
+    ...         print "%s: %s" % h
+    ...     print
+    ...     size = 0
+    ...     while 1:
+    ...         rn, data = read_message(worker_socket)
+    ...         if rn != rno:
+    ...            raise AssertionError("Bad request numbers", rno, rn)
+    ...         if data:
+    ...             if size_only:
+    ...                 size += len(data)
+    ...             else:
+    ...                 print data,
+    ...         else:
+    ...             break
+    ...     if size_only:
+    ...        print size
+
+    >>> print_response(worker_socket, 1)
+    1 200 OK
+    Content-Length: 40
+    Content-Type: text/html; charset=UTF-8; charset=UTF-8
+    <BLANKLINE>
+    <BLANKLINE>
+    <BLANKLINE>
+    http://localhost/hi.html -> 6115 0 0
+    <BLANKLINE>
+
+We can have multiple outstanding requests:
+
+    >>> env, _ = newenv('', '/hi.html')
+    >>> write_message(worker_socket, 2, env)
+    >>> env, inp = newenv('1', '/hi.html')
+    >>> write_message(worker_socket, 3, env)
+    >>> env, inp = newenv('1', '/hi.html')
+    >>> write_message(worker_socket, 4, env)
+
+At this point, we have 3 outstading requests.  Let's create 3 bodies:
+
+    >>> b2 = 'x'*1000
+    >>> b3 = 'y'*10000
+    >>> b4 = 'z'*100000
+
+    >>> hash(b2), hash(b3), hash(b4)
+    (8412291732507076776, 5203428436375121216, -3961752530033413152)
+
+and send them:
+
+    >>> write_message(worker_socket, 2, b2)
+    >>> write_message(worker_socket, 3, b3[:5000])
+    >>> write_message(worker_socket, 4, b4[:5000])
+    >>> write_message(worker_socket, 4, b4[5000:10000])
+    >>> write_message(worker_socket, 3, b3[5000:10000])
+    >>> for i in range(1, 10):
+    ...     write_message(worker_socket, 4, b4[i*10000:(i+1)*10000])
+
+    >>> write_message(worker_socket, 4, '')
+    >>> print_response(worker_socket, 4)
+    4 200 OK
+    Content-Length: 64
+    Content-Type: text/html; charset=UTF-8; charset=UTF-8
+    <BLANKLINE>
+    <BLANKLINE>
+    <BLANKLINE>
+    http://localhost/hi.html -> 6115 100000 -3961752530033413152
+    <BLANKLINE>
+
+    >>> write_message(worker_socket, 2, '')
+    >>> print_response(worker_socket, 2)
+    2 200 OK
+    Content-Length: 61
+    Content-Type: text/html; charset=UTF-8; charset=UTF-8
+    <BLANKLINE>
+    <BLANKLINE>
+    <BLANKLINE>
+    http://localhost/hi.html -> 6115 1000 8412291732507076776
+    <BLANKLINE>
+
+    >>> write_message(worker_socket, 3, '')
+    >>> print_response(worker_socket, 3)
+    3 200 OK
+    Content-Length: 62
+    Content-Type: text/html; charset=UTF-8; charset=UTF-8
+    <BLANKLINE>
+    <BLANKLINE>
+    <BLANKLINE>
+    http://localhost/hi.html -> 6115 10000 5203428436375121216
+    <BLANKLINE>
+
+The handler for hi.html outputs the url, the pid, the request body
+size and the request body hash.  Note that the hashes match the test
+bodies we created.
+
+We told the worker to use a history of length 5.  This means that it
+will keep track of times for the last 5 requests and compute a new
+resume after 5 requests.  Let's test that by making a 5th request.
+
+    >>> env, _ = newenv('2', '/sleep.html?dur=.11')
+    >>> write_message(worker_socket, 5, env, '')
+    >>> print_response(worker_socket, 5)
+    5 200 OK
+    Content-Length: 12
+    Content-Type: text/html; charset=UTF-8; charset=UTF-8
+    <BLANKLINE>
+    hello world
+
+The next message we recieve will be the new resume:
+
+    >>> zero, resume = read_message(worker_socket)
+    >>> zero == zero
+    True
+    >>> resume.keys()
+    ['', '1', '2']
+
+    >>> [x for x in resume.values() if type(x) != float]
+    []
+
+    >>> resume[''] > 10, resume['1'] > 10, resume['2'] < 10
+    (True, True, True)
+
+The numbers in the resumes are average requests per second.  For the
+last request, we sleep .11 seconds to assure that it's resume entry
+would be less than 10.
+
+We can reuse request numbers. We normally don't reuse request numbers
+until we get to 4 billion or so., But lots make sure we can reuse
+them:
+
+    >>> env, _ = newenv('', '/gen.html?size=100')
+    >>> write_message(worker_socket, 1, env, '')
+
+In this example, we've also requested a very large output.
+
+    >>> print_response(worker_socket, 1, size_only=True)
+    1 200 OK
+    Content-Length: 1200000
+    Content-Type: text/html; charset=UTF-8
+    <BLANKLINE>
+    1200000


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
___________________________________________________________________
Added: svn:eol-style
   + native



More information about the checkins mailing list