[Checkins] SVN: Sandbox/J1m/resumelb/s Added (non-ad-hoc) ZooKeeper integration.
Jim Fulton
jim at zope.com
Sun Jan 29 19:22:53 UTC 2012
Log message for revision 124247:
Added (non-ad-hoc) ZooKeeper integration.
Updated tests to take advantage of zope.testing 4.1.0
Changed:
U Sandbox/J1m/resumelb/setup.py
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
U Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
A Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
A Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
-=-
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py 2012-01-29 19:22:49 UTC (rev 124246)
+++ Sandbox/J1m/resumelb/setup.py 2012-01-29 19:22:53 UTC (rev 124247)
@@ -17,7 +17,8 @@
'setuptools', 'gevent >=1.0b1', 'WebOb', 'zc.thread', 'zc.parse_addr',
'zc.mappingobject', 'llist']
extras_require = dict(
- test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
+ test=['zope.testing', 'bobo', 'manuel', 'WebTest', 'zc.zk',
+ 'ZConfig', 'mock'])
entry_points = """
[console_scripts]
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test 2012-01-29 19:22:49 UTC (rev 124246)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test 2012-01-29 19:22:53 UTC (rev 124247)
@@ -32,7 +32,7 @@
We pass the constructor an iterable of addresses. The lb will connect
to these addresses. Let's wait for it to do so:
- >>> wait_until(
+ >>> wait(
... lambda :
... len([w for w in workers if hasattr(w, 'socket')]) == len(workers)
... )
@@ -327,7 +327,7 @@
>>> workers[0].server = gevent.server.StreamServer(
... ('127.0.0.1', port), workers[0].handle)
>>> workers[0].server.start()
- >>> wait_until(lambda : workers[0].socket is not socket)
+ >>> wait(lambda : workers[0].socket is not socket)
>>> write_message(workers[0].socket, 0, {'h3.com': 10.0})
>>> gevent.sleep(.01)
>>> len(lb.pool.workers)
@@ -344,7 +344,7 @@
>>> workers.append(Worker())
>>> lb.set_worker_addrs([w.addr for w in workers])
- >>> wait_until(lambda : hasattr(workers[-1], 'socket'))
+ >>> wait(lambda : hasattr(workers[-1], 'socket'))
>>> write_message(workers[-1].socket, 0, {'h4.com': 10})
>>> gevent.sleep(.01)
>>> len(lb.pool.workers)
@@ -384,3 +384,6 @@
Typically, by the time we remove an address, the worker will already
have gone away.
+Cleanup:
+
+ >>> lb.stop()
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/tests.py 2012-01-29 19:22:49 UTC (rev 124246)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/tests.py 2012-01-29 19:22:53 UTC (rev 124247)
@@ -19,9 +19,14 @@
import manuel.doctest
import manuel.testing
import os
+import re
import time
import unittest
import webob
+import zc.zk.testing
+import zope.testing.setupstack
+import zope.testing.wait
+import zope.testing.renormalizing
pid = os.getpid()
@@ -46,28 +51,40 @@
def app():
return bobo.Application(bobo_resources=__name__)
-def wait_until(func=None, timeout=9):
- if func is None:
- return lambda f: wait_until(f, timeout)
- deadline = time.time() + timeout
- while time.time() < deadline:
- if func():
- return
- gevent.sleep(.01)
- raise ValueError('timeout')
+def test_classifier(env):
+ return "yup, it's a test"
def setUp(test):
+ zope.testing.setupstack.setUpDirectory(test)
global pid
pid = 6115
- test.globs['wait_until'] = wait_until
+ test.globs['wait'] = zope.testing.wait.Wait(getsleep=lambda : gevent.sleep)
+def zkSetUp(test):
+ setUp(test)
+ zc.zk.testing.setUp(test)
+ os.environ['COLUMNS'] = '70'
+
+def zkTearDown(test):
+ zc.zk.testing.tearDown(test)
+ zope.testing.setupstack.tearDown(test)
+
def test_suite():
return unittest.TestSuite((
manuel.testing.TestSuite(
manuel.doctest.Manuel() + manuel.capture.Manuel(),
- *(sorted(name for name in os.listdir(os.path.dirname(__file__))
- if name.endswith('.test')
- )),
- setUp=setUp),
+ 'lb.test', 'pool.test', 'worker.test',
+ setUp=setUp, tearDown=zope.testing.setupstack.tearDown),
+ manuel.testing.TestSuite(
+ manuel.doctest.Manuel(
+ checker = zope.testing.renormalizing.OutputChecker([
+ (re.compile(
+ r'\[\d{4}-\d\d-\d\d \d\d:\d\d:\d\d\] "(.+) \d+\.\d+'
+ ),
+ 'ACCESS'),
+ ])
+ ) + manuel.capture.Manuel(),
+ 'zk.test',
+ setUp=zkSetUp, tearDown=zkTearDown),
))
Added: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.py (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.py 2012-01-29 19:22:53 UTC (rev 124247)
@@ -0,0 +1,159 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""ZooKeeper integration
+"""
+import gevent
+import gevent.pool
+import logging
+import os
+import sys
+import zc.parse_addr
+import zc.zk
+
+def worker(app, global_conf, zookeeper, path, loggers=None, address=':0',
+ run=True):
+ """Paste deploy server runner
+ """
+ if loggers:
+ import ZConfig
+ ZConfig.configureLoggers(loggers)
+
+ zk = zc.zk.ZooKeeper(zookeeper)
+ address = zc.parse_addr.parse_addr(address)
+ from zc.resumelb.worker import Worker
+ worker = Worker(app, address, zk.properties(path))
+ zk.register_server(path+'/providers', worker.addr)
+ worker.zk = zk
+ if run:
+ try:
+ worker.server.run_forever()
+ finally:
+ logging.getLogger(__name__+'.worker').info('exiting')
+ zk.close()
+ else:
+ gevent.sleep(.01)
+ return worker
+
+def lbmain(args=None, run=True):
+ """%prog [options] zookeeper_connection path
+
+ Run a resume-based load balancer on addr.
+ """
+ if args is None:
+ args = sys.argv[1:]
+ elif isinstance(args, str):
+ args = args.split()
+ run = False
+ import optparse
+ parser = optparse.OptionParser(lbmain.__doc__)
+ parser.add_option(
+ '-a', '--address', default=':0',
+ help="Address to listed on for web requests"
+ )
+ parser.add_option(
+ '-l', '--access-log', default='-',
+ help='Access-log path.\n\n'
+ 'Use - (default) for standard output and an empty string to suppress.\n'
+ )
+ parser.add_option(
+ '-b', '--backlog', type='int',
+ help="Server backlog setting.")
+ parser.add_option(
+ '-m', '--max-connections', type='int',
+ help="Maximum number of simultanious accepted connections.")
+ parser.add_option(
+ '--logger-configuration',
+ help=
+ "Read logger configuration from the given configuration file path.\n"
+ "\n"
+ "The configuration file must be in ZConfig logger configuration syntax."
+ )
+ parser.add_option(
+ '-r', '--request-classifier', default='zc.resumelb.lb:host_classifier',
+ help="Request classification function (module:expr)"
+ )
+ parser.add_option(
+ '-e', '--disconnect-message',
+ help="Path to error page to use when a request is lost due to "
+ "worker disconnection"
+ )
+
+ try:
+ options, args = parser.parse_args(args)
+ if len(args) != 2:
+ print 'Error: must supply a zookeeper connection string and path.'
+ parser.parse_args(['-h'])
+ zookeeper, path = args
+ except SystemExit:
+ if run:
+ raise
+ else:
+ return
+
+ if options.logger_configuration:
+ import ZConfig
+ with open(options.logger_configuration) as f:
+ ZConfig.configureLoggers(f.read())
+
+
+ zk = zc.zk.ZooKeeper(zookeeper)
+ settings = zk.properties(path)
+ addrs = zk.children(path+'/workers/providers')
+ rcmod, rcexpr = options.request_classifier.split(':')
+ __import__(rcmod)
+ rcmod = sys.modules[rcmod]
+ request_classifier = eval(rcexpr, rcmod.__dict__)
+
+ disconnect_message = options.disconnect_message
+ if disconnect_message:
+ with open(disconnect_message) as f:
+ disconnect_message = f.read()
+ else:
+ disconnect_message = zc.resumelb.lb.default_disconnect_message
+
+ from zc.resumelb.lb import LB
+ lb = LB(map(zc.parse_addr.parse_addr, addrs),
+ request_classifier, settings, disconnect_message)
+ lb.zk = zk
+
+ # Set up notification of address changes.
+ watcher = gevent.get_hub().loop.async()
+ @watcher.start
+ def _():
+ lb.set_worker_addrs(map(zc.parse_addr.parse_addr, addrs))
+ addrs(lambda a: watcher.send())
+
+ # Now, start a wsgi server
+ addr = zc.parse_addr.parse_addr(options.address)
+ if options.max_connections:
+ spawn= gevent.pool.Pool(options.max_connections)
+ else:
+ spawn = 'default'
+ accesslog = (sys.stdout if options.access_log == '-'
+ else open(options.access_log, 'a'))
+ server = gevent.pywsgi.WSGIServer(
+ addr, lb.handle_wsgi, backlog = options.backlog,
+ spawn = spawn, log = accesslog)
+ server.start()
+ zk.register_server(path+'/providers', (addr[0], server.server_port))
+
+ if run:
+ try:
+ server.serve_forever()
+ finally:
+ logging.getLogger(__name__+'.lbmain').info('exiting')
+ zk.close()
+ else:
+ gevent.sleep(.01)
+ return lb, server, accesslog
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.test (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.test 2012-01-29 19:22:53 UTC (rev 124247)
@@ -0,0 +1,257 @@
+=====================
+ZooKeeper integration
+=====================
+
+ >>> import zc.resumelb.zk, zc.resumelb.tests, zc.zk
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+
+To demonstrate the integration, we'll set up a tree:
+
+ >>> zk.import_tree("""
+ ... /test
+ ... /lb
+ ... /providers
+ ... /workers
+ ... history=999
+ ... /providers
+ ... """)
+
+There are some things to note about this tree:
+
+- We have an lb node, where we'll configure an lb. It has a workers
+ subnode where we'll configure the workers. This subnode could be a
+ regular node, or a symbolic link.
+
+- Addresses of running workers and load balancers will be registered
+ at provider subnodes.
+
+Workers
+=======
+
+There's a paste server runner that:
+
+- Registers with ZooKeeper, and
+
+- Get's settings from ZooKeeper.
+
+The server runner takes options:
+
+zookeeper
+ A ZooKeeper connection string
+
+path
+ A zookeeper where he worker will get settings and publish it's
+ address.
+
+address
+ Address to listen on, of the form HOST:PORT
+
+ This defaults to ":0", to bind to a dynamic port on all IPv4 addresses.
+
+loggers
+ A ZConfig loggers-definition string
+
+Let's create a worker, making sure that ZConfig.configureLoggers was called.
+
+ >>> app = zc.resumelb.tests.app()
+ >>> import mock
+ >>> with mock.patch('ZConfig.configureLoggers') as configureLoggers:
+ ... worker = zc.resumelb.zk.worker(
+ ... app, None,
+ ... zookeeper='zookeeper.example.com:2181', path='/test/lb/workers',
+ ... address='127.0.0.1:0', run=False, loggers='loggers')
+ ... configureLoggers.assert_called_with('loggers')
+
+Normally, when used with paste, the worker function runs forever. We
+passed the run argument with a false value. The run argument exists
+soley for testing.
+
+The worker got it's settings from the tree:
+
+ >>> worker.settings.history
+ 999
+
+It register's it's address as an emphemeral subnode of the provider's
+subnode of the given path:
+
+ >>> [waddr] = zk.get_children('/test/lb/workers/providers')
+ >>> waddr == "%s:%s" % worker.addr
+ True
+
+ >>> meta = zk.get('/test/lb/workers/providers/' + waddr)[1]
+ >>> bool(meta['ephemeralOwner'])
+ True
+
+LB
+==
+
+There's a script entry point for a load balancer. It takes
+command-line arguments and also has a run keyword argument for
+testing. As an added testing convenience, if arguments are passed as a
+string, the string is split and testing mode (run=False) is assumed.
+
+ >>> zc.resumelb.zk.lbmain('-h')
+ Usage: test [options] zookeeper_connection path
+ <BLANKLINE>
+ Run a resume-based load balancer on addr.
+ <BLANKLINE>
+ <BLANKLINE>
+ Options:
+ -h, --help show this help message and exit
+ -a ADDRESS, --address=ADDRESS
+ Address to listed on for web requests
+ -l ACCESS_LOG, --access-log=ACCESS_LOG
+ Access-log path. Use - (default) for
+ standard output and an empty string to
+ suppress.
+ -b BACKLOG, --backlog=BACKLOG
+ Server backlog setting.
+ -m MAX_CONNECTIONS, --max-connections=MAX_CONNECTIONS
+ Maximum number of simultanious accepted
+ connections.
+ --logger-configuration=LOGGER_CONFIGURATION
+ Read logger configuration from the given
+ configuration file path. The configuration
+ file must be in ZConfig logger configuration
+ syntax.
+ -r REQUEST_CLASSIFIER, --request-classifier=REQUEST_CLASSIFIER
+ Request classification function
+ (module:expr)
+ -e DISCONNECT_MESSAGE, --disconnect-message=DISCONNECT_MESSAGE
+ Path to error page to use when a request is
+ lost due to worker disconnection
+
+
+Let's start with a simple call:
+
+ >>> lb, server, accesslog = zc.resumelb.zk.lbmain(
+ ... 'zookeeper.example.com:2181 /test/lb')
+
+ >>> import sys
+ >>> accesslog is sys.stdout
+ True
+
+At this point, the lb is running and listening on all interfaces using
+a self assigned port.
+
+It has connected to the worker and thus has a pool size of one:
+
+ >>> len(lb.pool.workers)
+ 1
+
+Let's make a web request. First, we'll get the lb address from ZooKeeper:
+
+ >>> import zc.parse_addr
+ >>> addr = zc.parse_addr.parse_addr(
+ ... zk.get_children('/test/lb/providers')[0])
+
+All of the addresses are ips, not hostnames. None of them include localhost:
+
+ >>> [a for a in zk.get_children('/test/lb/providers')
+ ... if a.startswith('localhost:')]
+ []
+
+
+Then we'll make a simpler GET request:
+
+ >>> import gevent.socket
+ >>> sock = gevent.socket.create_connection(('127.0.0.1', addr[1]))
+ >>> sock.sendall('''GET /hi.html HTTP/1.0\r
+ ... Host: h1.com\r
+ ... Content-Length: 0\r
+ ... \r
+ ... ''')
+
+ >>> print sock.recv(9999) # doctest: +ELLIPSIS
+ 127.0.0.1 - - [2012-01-28 15:29:30] "GET /hi.html HTTP/1.0" 200 226 0.002349
+ HTTP/1.0 200 OK...
+
+If we create another worker, it will be seen by the load balancer:
+
+ >>> worker2 = zc.resumelb.zk.worker(
+ ... app, None,
+ ... zookeeper='zookeeper.example.com:2181', path='/test/lb/workers',
+ ... address='127.0.0.1:0', run=False)
+
+ >>> len(lb.pool.workers)
+ 2
+
+OK, so let's try a more complex examlpe. Maybe we can exercise all of
+the options!
+
+ >>> server.stop()
+ >>> lb.stop()
+ >>> lb.zk.close()
+
+ >>> with open('log.conf', 'w') as f:
+ ... f.write('loggers')
+
+ >>> with open('oops.html', 'w') as f:
+ ... f.write("oops")
+
+ >>> with mock.patch('ZConfig.configureLoggers') as configureLoggers:
+ ... lb, server, accesslog = zc.resumelb.zk.lbmain(
+ ... 'zookeeper.example.com:2181 /test/lb'
+ ... ' -alocalhost:0 -laccess.log -b1 -m1'
+ ... ' --logger-configuration log.conf '
+ ... ' -rzc.resumelb.tests:test_classifier -eoops.html'
+ ... )
+ ... configureLoggers.assert_called_with('loggers')
+
+ >>> lb.disconnect_message
+ 'oops'
+
+ >>> [addr] = map(zc.parse_addr.parse_addr,
+ ... zk.get_children('/test/lb/providers'))
+
+ >>> addr[0]
+ 'localhost'
+
+ >>> len(lb.pool.workers)
+ 2
+
+ >>> sock = gevent.socket.create_connection(addr)
+ >>> sock2 = gevent.socket.create_connection(addr)
+ >>> sock3 = gevent.socket.create_connection(addr, .01)
+ Traceback (most recent call last):
+ ...
+ timeout: timed out
+
+The 3rd collection failed because we said to only accept one
+connection at a time and set the backlog to 1.
+
+Let's do a request.
+
+ >>> sock.sendall('''GET /hi.html HTTP/1.0\r
+ ... Host: h1.com\r
+ ... Content-Length: 0\r
+ ... \r
+ ... ''')
+ >>> print sock.recv(9999) # doctest: +ELLIPSIS
+ HTTP/1.0 200 OK...
+
+We didn't get an access log entry in the output, because it's in the
+access log.
+
+ >>> accesslog.flush()
+ >>> with open('access.log') as f:
+ ... print f.read()
+ 127.0.0.1 - - [2012-01-29 14:11:37] "GET /hi.html HTTP/1.0" 200 226 0.001074
+ <BLANKLINE>
+
+By looking at the lb's pool's skilled data structure, we can see that
+the test request classifier was used.
+
+ >>> list(lb.pool.skilled)
+ ["yup, it's a test"]
+
+OK. now let's shut down the server and lb.
+
+ >>> server.stop()
+ >>> lb.stop()
+ >>> lb.zk.close()
+
+And the workers:
+
+ >>> worker.stop()
+ >>> worker2.stop()
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
___________________________________________________________________
Added: svn:eol-style
+ native
More information about the checkins
mailing list