[Checkins] SVN: Sandbox/J1m/resumelb/s Added worker tests.
Jim Fulton
jim at zope.com
Mon Oct 31 10:56:27 UTC 2011
Log message for revision 123194:
Added worker tests.
Changed:
U Sandbox/J1m/resumelb/setup.py
U Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
U Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
A Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
-=-
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py 2011-10-31 10:56:24 UTC (rev 123193)
+++ Sandbox/J1m/resumelb/setup.py 2011-10-31 10:56:27 UTC (rev 123194)
@@ -14,7 +14,7 @@
name, version = 'zc.resumelb', '0'
install_requires = ['setuptools', 'gevent']
-extras_require = dict(test=['zope.testing', 'bobo'])
+extras_require = dict(test=['zope.testing', 'bobo', 'WebOb', 'manuel'])
entry_points = """
[console_scripts]
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/tests.py 2011-10-31 10:56:24 UTC (rev 123193)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/tests.py 2011-10-31 10:56:27 UTC (rev 123194)
@@ -1,8 +1,59 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
import bobo
+import doctest
+import gevent
+import manuel.capture
+import manuel.doctest
+import manuel.testing
import os
+import time
+import unittest
+import webob
- at bobo.query
-def hi(bobo_request):
- return "\n\n%s -> %s\n\n" % (
- bobo_request.url, os.getpid())
+pid = os.getpid()
+ at bobo.resource
+def hi(request):
+ body = request.environ['wsgi.input'].read()
+ return "\n\n%s -> %s %s %s\n\n" % (
+ request.url, pid, len(body), hash(body))
+
+ at bobo.query('/gen.html')
+def gen(size=0):
+ size = int(size)
+ return webob.Response(
+ app_iter=['hello world\n'*1000]*size,
+ content_length=12000*size)
+
+ at bobo.query('/sleep.html')
+def sleep(dur=0):
+ time.sleep(float(dur))
+ return 'hello world\n'
+
+def app():
+ return bobo.Application(bobo_resources=__name__)
+
+def setUp(test):
+ global pid
+ pid = 6115
+
+def test_suite():
+ return unittest.TestSuite((
+ manuel.testing.TestSuite(
+ manuel.doctest.Manuel() + manuel.capture.Manuel(),
+ 'worker.test',
+ setUp=setUp),
+ ))
+
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2011-10-31 10:56:24 UTC (rev 123193)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2011-10-31 10:56:27 UTC (rev 123194)
@@ -31,6 +31,7 @@
def connect(self, addr):
socket = gevent.socket.create_connection(addr)
readers = self.connected(socket)
+ self.put((0, self.resume))
while self.connected:
try:
@@ -44,7 +45,7 @@
env = data
env['zc.resumelb.time'] = time.time()
env['zc.resumelb.lb_addr'] = self.addr
- gevent.Greenlet.spawn(self.handle, rno, self.start(rno), env)
+ gevent.spawn(self.handle, rno, self.start(rno), env)
else:
rput(data)
@@ -64,6 +65,7 @@
return
else:
break
+ f.seek(0)
def start_response(status, headers, exc_info=None):
assert not exc_info # XXX
@@ -74,8 +76,9 @@
self.put((rno, data))
self.put((rno, ''))
+ self.readers.pop(rno)
- elapsed = time.time() - env['zc.resumelb.time']
+ elapsed = max(time.time() - env['zc.resumelb.time'], 1e-9)
time_ring = self.time_ring
time_ring_pos = rno % self.time_ring_size
rclass = env['zc.resumelb.request_class']
@@ -107,5 +110,4 @@
logging.basicConfig(level=logging.INFO)
host, port = lb.split(':')
Worker(app, (host, int(port)), history)
- gevent.hub.get_hub().switch()
Added: Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.test (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.test 2011-10-31 10:56:27 UTC (rev 123194)
@@ -0,0 +1,222 @@
+Workers act as wsgi servers, but rather than listenting for HTTP
+requests, they connect to a resumelb to request work.
+
+To test the worker, we'll start a testing lb that let's us control
+data going to and from the worker.
+
+ >>> import gevent.server, gevent.event
+ >>> worker_socket_result = gevent.event.AsyncResult()
+ >>> def handle(sock, addr):
+ ... worker_socket_result.set(sock)
+
+ >>> server = gevent.server.StreamServer(('127.0.0.1', 0), handle)
+ >>> server.start()
+
+Now, we can start a worker:
+
+ >>> import zc.resumelb.worker, zc.resumelb.tests
+ >>> worker = gevent.spawn(
+ ... zc.resumelb.worker.Worker,
+ ... zc.resumelb.tests.app(),
+ ... ('127.0.0.1', server.server_port),
+ ... history=5)
+
+Here we created a worker using a test application, telling it to
+connect to our server address and to update it's resume after every
+five requests.
+
+Now, wait for the worker to connect:
+
+ >>> worker_socket = worker_socket_result.get()
+
+Workers and the lb communicate via sized messages.
+
+Each message consists of binary request numbers, data size and a
+marshalled data string. Helper functions help us read and write
+messages. When workers connect, they send their resume and then wait
+for work to do. Because our worker has no experience :), it's resume
+is empty:
+
+ >>> from zc.resumelb.util import read_message, write_message
+ >>> read_message(worker_socket)
+ (0, {})
+
+When the worker sends it's resume, it sends 0 as the request number.
+
+Now, let's send a request to the worker. Requests are based on wsgi
+environments. We'll use webob to help us set this up.
+
+ >>> import webob
+ >>> def newenv(rclass, *a, **kw):
+ ... r = webob.Request.blank(*a, **kw)
+ ... env = r.environ.copy()
+ ... inp = env.pop('wsgi.input')
+ ... del env['wsgi.errors']
+ ... env['zc.resumelb.request_class'] = rclass
+ ... return env, inp
+
+ >>> env, _ = newenv('', '/hi.html')
+
+The newenv helper:
+
+- Creates a request enviton
+- without input or error streams
+- with a passed request class. The request class is needed for the resume.
+
+ >>> write_message(worker_socket, 1, env, '')
+
+write_message takes a socket to write to, a request number and one or
+more data objects. Here, we passed 2 data objects, the request
+environment and an empty string indicating the end of the (empty)
+request body.
+
+The worker will process the request and send back 3 records:
+
+- response status and headers,
+- response body, and
+- an empty end-of-body message.
+
+ >>> def print_response(worker_socket, rno, size_only=False):
+ ... rn, (status, headers) = read_message(worker_socket)
+ ... if rn != rno:
+ ... raise AssertionError("Bad request numbers", rno, rn)
+ ... print rno, status
+ ... for h in sorted(headers):
+ ... print "%s: %s" % h
+ ... print
+ ... size = 0
+ ... while 1:
+ ... rn, data = read_message(worker_socket)
+ ... if rn != rno:
+ ... raise AssertionError("Bad request numbers", rno, rn)
+ ... if data:
+ ... if size_only:
+ ... size += len(data)
+ ... else:
+ ... print data,
+ ... else:
+ ... break
+ ... if size_only:
+ ... print size
+
+ >>> print_response(worker_socket, 1)
+ 1 200 OK
+ Content-Length: 40
+ Content-Type: text/html; charset=UTF-8; charset=UTF-8
+ <BLANKLINE>
+ <BLANKLINE>
+ <BLANKLINE>
+ http://localhost/hi.html -> 6115 0 0
+ <BLANKLINE>
+
+We can have multiple outstanding requests:
+
+ >>> env, _ = newenv('', '/hi.html')
+ >>> write_message(worker_socket, 2, env)
+ >>> env, inp = newenv('1', '/hi.html')
+ >>> write_message(worker_socket, 3, env)
+ >>> env, inp = newenv('1', '/hi.html')
+ >>> write_message(worker_socket, 4, env)
+
+At this point, we have 3 outstading requests. Let's create 3 bodies:
+
+ >>> b2 = 'x'*1000
+ >>> b3 = 'y'*10000
+ >>> b4 = 'z'*100000
+
+ >>> hash(b2), hash(b3), hash(b4)
+ (8412291732507076776, 5203428436375121216, -3961752530033413152)
+
+and send them:
+
+ >>> write_message(worker_socket, 2, b2)
+ >>> write_message(worker_socket, 3, b3[:5000])
+ >>> write_message(worker_socket, 4, b4[:5000])
+ >>> write_message(worker_socket, 4, b4[5000:10000])
+ >>> write_message(worker_socket, 3, b3[5000:10000])
+ >>> for i in range(1, 10):
+ ... write_message(worker_socket, 4, b4[i*10000:(i+1)*10000])
+
+ >>> write_message(worker_socket, 4, '')
+ >>> print_response(worker_socket, 4)
+ 4 200 OK
+ Content-Length: 64
+ Content-Type: text/html; charset=UTF-8; charset=UTF-8
+ <BLANKLINE>
+ <BLANKLINE>
+ <BLANKLINE>
+ http://localhost/hi.html -> 6115 100000 -3961752530033413152
+ <BLANKLINE>
+
+ >>> write_message(worker_socket, 2, '')
+ >>> print_response(worker_socket, 2)
+ 2 200 OK
+ Content-Length: 61
+ Content-Type: text/html; charset=UTF-8; charset=UTF-8
+ <BLANKLINE>
+ <BLANKLINE>
+ <BLANKLINE>
+ http://localhost/hi.html -> 6115 1000 8412291732507076776
+ <BLANKLINE>
+
+ >>> write_message(worker_socket, 3, '')
+ >>> print_response(worker_socket, 3)
+ 3 200 OK
+ Content-Length: 62
+ Content-Type: text/html; charset=UTF-8; charset=UTF-8
+ <BLANKLINE>
+ <BLANKLINE>
+ <BLANKLINE>
+ http://localhost/hi.html -> 6115 10000 5203428436375121216
+ <BLANKLINE>
+
+The handler for hi.html outputs the url, the pid, the request body
+size and the request body hash. Note that the hashes match the test
+bodies we created.
+
+We told the worker to use a history of length 5. This means that it
+will keep track of times for the last 5 requests and compute a new
+resume after 5 requests. Let's test that by making a 5th request.
+
+ >>> env, _ = newenv('2', '/sleep.html?dur=.11')
+ >>> write_message(worker_socket, 5, env, '')
+ >>> print_response(worker_socket, 5)
+ 5 200 OK
+ Content-Length: 12
+ Content-Type: text/html; charset=UTF-8; charset=UTF-8
+ <BLANKLINE>
+ hello world
+
+The next message we recieve will be the new resume:
+
+ >>> zero, resume = read_message(worker_socket)
+ >>> zero == zero
+ True
+ >>> resume.keys()
+ ['', '1', '2']
+
+ >>> [x for x in resume.values() if type(x) != float]
+ []
+
+ >>> resume[''] > 10, resume['1'] > 10, resume['2'] < 10
+ (True, True, True)
+
+The numbers in the resumes are average requests per second. For the
+last request, we sleep .11 seconds to assure that it's resume entry
+would be less than 10.
+
+We can reuse request numbers. We normally don't reuse request numbers
+until we get to 4 billion or so., But lots make sure we can reuse
+them:
+
+ >>> env, _ = newenv('', '/gen.html?size=100')
+ >>> write_message(worker_socket, 1, env, '')
+
+In this example, we've also requested a very large output.
+
+ >>> print_response(worker_socket, 1, size_only=True)
+ 1 200 OK
+ Content-Length: 1200000
+ Content-Type: text/html; charset=UTF-8
+ <BLANKLINE>
+ 1200000
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
___________________________________________________________________
Added: svn:eol-style
+ native
More information about the checkins
mailing list