[Checkins] SVN: Sandbox/J1m/resumelb/s Added (non-ad-hoc) ZooKeeper integration.

Jim Fulton jim at zope.com
Sun Jan 29 19:22:53 UTC 2012


Log message for revision 124247:
  Added (non-ad-hoc) ZooKeeper integration.
  
  Updated tests to take advantage of zope.testing 4.1.0
  

Changed:
  U   Sandbox/J1m/resumelb/setup.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/zk.test

-=-
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py	2012-01-29 19:22:49 UTC (rev 124246)
+++ Sandbox/J1m/resumelb/setup.py	2012-01-29 19:22:53 UTC (rev 124247)
@@ -17,7 +17,8 @@
     'setuptools', 'gevent >=1.0b1', 'WebOb', 'zc.thread', 'zc.parse_addr',
     'zc.mappingobject', 'llist']
 extras_require = dict(
-    test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
+    test=['zope.testing', 'bobo', 'manuel', 'WebTest', 'zc.zk',
+          'ZConfig', 'mock'])
 
 entry_points = """
 [console_scripts]

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-01-29 19:22:49 UTC (rev 124246)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-01-29 19:22:53 UTC (rev 124247)
@@ -32,7 +32,7 @@
 We pass the constructor an iterable of addresses.  The lb will connect
 to these addresses. Let's wait for it to do so:
 
-    >>> wait_until(
+    >>> wait(
     ...     lambda :
     ...     len([w for w in workers if hasattr(w, 'socket')]) == len(workers)
     ...     )
@@ -327,7 +327,7 @@
     >>> workers[0].server = gevent.server.StreamServer(
     ...     ('127.0.0.1', port), workers[0].handle)
     >>> workers[0].server.start()
-    >>> wait_until(lambda : workers[0].socket is not socket)
+    >>> wait(lambda : workers[0].socket is not socket)
     >>> write_message(workers[0].socket, 0, {'h3.com': 10.0})
     >>> gevent.sleep(.01)
     >>> len(lb.pool.workers)
@@ -344,7 +344,7 @@
 
     >>> workers.append(Worker())
     >>> lb.set_worker_addrs([w.addr for w in workers])
-    >>> wait_until(lambda : hasattr(workers[-1], 'socket'))
+    >>> wait(lambda : hasattr(workers[-1], 'socket'))
     >>> write_message(workers[-1].socket, 0, {'h4.com': 10})
     >>> gevent.sleep(.01)
     >>> len(lb.pool.workers)
@@ -384,3 +384,6 @@
 Typically, by the time we remove an address, the worker will already
 have gone away.
 
+Cleanup:
+
+    >>> lb.stop()

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2012-01-29 19:22:49 UTC (rev 124246)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2012-01-29 19:22:53 UTC (rev 124247)
@@ -19,9 +19,14 @@
 import manuel.doctest
 import manuel.testing
 import os
+import re
 import time
 import unittest
 import webob
+import zc.zk.testing
+import zope.testing.setupstack
+import zope.testing.wait
+import zope.testing.renormalizing
 
 pid = os.getpid()
 
@@ -46,28 +51,40 @@
 def app():
     return bobo.Application(bobo_resources=__name__)
 
-def wait_until(func=None, timeout=9):
-    if func is None:
-        return lambda f: wait_until(f, timeout)
-    deadline = time.time() + timeout
-    while time.time() < deadline:
-        if func():
-            return
-        gevent.sleep(.01)
-    raise ValueError('timeout')
+def test_classifier(env):
+    return "yup, it's a test"
 
 def setUp(test):
+    zope.testing.setupstack.setUpDirectory(test)
     global pid
     pid = 6115
-    test.globs['wait_until'] = wait_until
+    test.globs['wait'] = zope.testing.wait.Wait(getsleep=lambda : gevent.sleep)
 
+def zkSetUp(test):
+    setUp(test)
+    zc.zk.testing.setUp(test)
+    os.environ['COLUMNS'] = '70'
+
+def zkTearDown(test):
+    zc.zk.testing.tearDown(test)
+    zope.testing.setupstack.tearDown(test)
+
 def test_suite():
     return unittest.TestSuite((
         manuel.testing.TestSuite(
             manuel.doctest.Manuel() + manuel.capture.Manuel(),
-            *(sorted(name for name in os.listdir(os.path.dirname(__file__))
-                     if name.endswith('.test')
-                     )),
-            setUp=setUp),
+            'lb.test', 'pool.test', 'worker.test',
+            setUp=setUp, tearDown=zope.testing.setupstack.tearDown),
+        manuel.testing.TestSuite(
+            manuel.doctest.Manuel(
+                checker = zope.testing.renormalizing.OutputChecker([
+                    (re.compile(
+                        r'\[\d{4}-\d\d-\d\d \d\d:\d\d:\d\d\] "(.+) \d+\.\d+'
+                        ),
+                     'ACCESS'),
+                    ])
+                ) + manuel.capture.Manuel(),
+            'zk.test',
+            setUp=zkSetUp, tearDown=zkTearDown),
         ))
 

Added: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.py	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.py	2012-01-29 19:22:53 UTC (rev 124247)
@@ -0,0 +1,159 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""ZooKeeper integration
+"""
+import gevent
+import gevent.pool
+import logging
+import os
+import sys
+import zc.parse_addr
+import zc.zk
+
+def worker(app, global_conf, zookeeper, path, loggers=None, address=':0',
+           run=True):
+    """Paste deploy server runner
+    """
+    if loggers:
+        import ZConfig
+        ZConfig.configureLoggers(loggers)
+
+    zk = zc.zk.ZooKeeper(zookeeper)
+    address = zc.parse_addr.parse_addr(address)
+    from zc.resumelb.worker import Worker
+    worker = Worker(app, address, zk.properties(path))
+    zk.register_server(path+'/providers', worker.addr)
+    worker.zk = zk
+    if run:
+        try:
+            worker.server.run_forever()
+        finally:
+            logging.getLogger(__name__+'.worker').info('exiting')
+            zk.close()
+    else:
+        gevent.sleep(.01)
+        return worker
+
+def lbmain(args=None, run=True):
+    """%prog [options] zookeeper_connection path
+
+    Run a resume-based load balancer on addr.
+    """
+    if args is None:
+        args = sys.argv[1:]
+    elif isinstance(args, str):
+        args = args.split()
+        run = False
+    import optparse
+    parser = optparse.OptionParser(lbmain.__doc__)
+    parser.add_option(
+        '-a', '--address', default=':0',
+        help="Address to listed on for web requests"
+        )
+    parser.add_option(
+        '-l', '--access-log', default='-',
+        help='Access-log path.\n\n'
+        'Use - (default) for standard output and an empty string to suppress.\n'
+        )
+    parser.add_option(
+        '-b', '--backlog', type='int',
+        help="Server backlog setting.")
+    parser.add_option(
+        '-m', '--max-connections', type='int',
+        help="Maximum number of simultanious accepted connections.")
+    parser.add_option(
+        '--logger-configuration',
+        help=
+        "Read logger configuration from the given configuration file path.\n"
+        "\n"
+        "The configuration file must be in ZConfig logger configuration syntax."
+        )
+    parser.add_option(
+        '-r', '--request-classifier', default='zc.resumelb.lb:host_classifier',
+        help="Request classification function (module:expr)"
+        )
+    parser.add_option(
+        '-e', '--disconnect-message',
+        help="Path to error page to use when a request is lost due to "
+        "worker disconnection"
+        )
+
+    try:
+        options, args = parser.parse_args(args)
+        if len(args) != 2:
+            print 'Error: must supply a zookeeper connection string and path.'
+            parser.parse_args(['-h'])
+        zookeeper, path = args
+    except SystemExit:
+        if run:
+            raise
+        else:
+            return
+
+    if options.logger_configuration:
+        import ZConfig
+        with open(options.logger_configuration) as f:
+            ZConfig.configureLoggers(f.read())
+
+
+    zk = zc.zk.ZooKeeper(zookeeper)
+    settings = zk.properties(path)
+    addrs = zk.children(path+'/workers/providers')
+    rcmod, rcexpr = options.request_classifier.split(':')
+    __import__(rcmod)
+    rcmod = sys.modules[rcmod]
+    request_classifier = eval(rcexpr, rcmod.__dict__)
+
+    disconnect_message = options.disconnect_message
+    if disconnect_message:
+        with open(disconnect_message) as f:
+            disconnect_message = f.read()
+    else:
+        disconnect_message = zc.resumelb.lb.default_disconnect_message
+
+    from zc.resumelb.lb import LB
+    lb = LB(map(zc.parse_addr.parse_addr, addrs),
+            request_classifier, settings, disconnect_message)
+    lb.zk = zk
+
+    # Set up notification of address changes.
+    watcher = gevent.get_hub().loop.async()
+    @watcher.start
+    def _():
+        lb.set_worker_addrs(map(zc.parse_addr.parse_addr, addrs))
+    addrs(lambda a: watcher.send())
+
+    # Now, start a wsgi server
+    addr = zc.parse_addr.parse_addr(options.address)
+    if options.max_connections:
+        spawn= gevent.pool.Pool(options.max_connections)
+    else:
+        spawn = 'default'
+    accesslog = (sys.stdout if options.access_log == '-'
+                 else open(options.access_log, 'a'))
+    server = gevent.pywsgi.WSGIServer(
+        addr, lb.handle_wsgi, backlog = options.backlog,
+        spawn = spawn, log = accesslog)
+    server.start()
+    zk.register_server(path+'/providers', (addr[0], server.server_port))
+
+    if run:
+        try:
+            server.serve_forever()
+        finally:
+            logging.getLogger(__name__+'.lbmain').info('exiting')
+            zk.close()
+    else:
+        gevent.sleep(.01)
+        return lb, server, accesslog


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.test	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.test	2012-01-29 19:22:53 UTC (rev 124247)
@@ -0,0 +1,257 @@
+=====================
+ZooKeeper integration
+=====================
+
+    >>> import zc.resumelb.zk, zc.resumelb.tests, zc.zk
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+
+To demonstrate the integration, we'll set up a tree:
+
+    >>> zk.import_tree("""
+    ... /test
+    ...   /lb
+    ...     /providers
+    ...     /workers
+    ...       history=999
+    ...       /providers
+    ... """)
+
+There are some things to note about this tree:
+
+- We have an lb node, where we'll configure an lb.  It has a workers
+  subnode where we'll configure the workers.  This subnode could be a
+  regular node, or a symbolic link.
+
+- Addresses of running workers and load balancers will be registered
+  at provider subnodes.
+
+Workers
+=======
+
+There's a paste server runner that:
+
+- Registers with ZooKeeper, and
+
+- Get's settings from ZooKeeper.
+
+The server runner takes options:
+
+zookeeper
+  A ZooKeeper connection string
+
+path
+  A zookeeper where he worker will get settings and publish it's
+  address.
+
+address
+  Address to listen on, of the form HOST:PORT
+
+  This defaults to ":0", to bind to a dynamic port on all IPv4 addresses.
+
+loggers
+  A ZConfig loggers-definition string
+
+Let's create a worker, making sure that ZConfig.configureLoggers was called.
+
+    >>> app = zc.resumelb.tests.app()
+    >>> import mock
+    >>> with mock.patch('ZConfig.configureLoggers') as configureLoggers:
+    ...     worker = zc.resumelb.zk.worker(
+    ...         app, None,
+    ...         zookeeper='zookeeper.example.com:2181', path='/test/lb/workers',
+    ...         address='127.0.0.1:0', run=False, loggers='loggers')
+    ...     configureLoggers.assert_called_with('loggers')
+
+Normally, when used with paste, the worker function runs forever.  We
+passed the run argument with a false value. The run argument exists
+soley for testing.
+
+The worker got it's settings from the tree:
+
+    >>> worker.settings.history
+    999
+
+It register's it's address as an emphemeral subnode of the provider's
+subnode of the given path:
+
+    >>> [waddr] = zk.get_children('/test/lb/workers/providers')
+    >>> waddr == "%s:%s" % worker.addr
+    True
+
+    >>> meta = zk.get('/test/lb/workers/providers/' + waddr)[1]
+    >>> bool(meta['ephemeralOwner'])
+    True
+
+LB
+==
+
+There's a script entry point for a load balancer.  It takes
+command-line arguments and also has a run keyword argument for
+testing. As an added testing convenience, if arguments are passed as a
+string, the string is split and testing mode (run=False) is assumed.
+
+    >>> zc.resumelb.zk.lbmain('-h')
+    Usage: test [options] zookeeper_connection path
+    <BLANKLINE>
+        Run a resume-based load balancer on addr.
+    <BLANKLINE>
+    <BLANKLINE>
+    Options:
+      -h, --help            show this help message and exit
+      -a ADDRESS, --address=ADDRESS
+                            Address to listed on for web requests
+      -l ACCESS_LOG, --access-log=ACCESS_LOG
+                            Access-log path.  Use - (default) for
+                            standard output and an empty string to
+                            suppress.
+      -b BACKLOG, --backlog=BACKLOG
+                            Server backlog setting.
+      -m MAX_CONNECTIONS, --max-connections=MAX_CONNECTIONS
+                            Maximum number of simultanious accepted
+                            connections.
+      --logger-configuration=LOGGER_CONFIGURATION
+                            Read logger configuration from the given
+                            configuration file path.  The configuration
+                            file must be in ZConfig logger configuration
+                            syntax.
+      -r REQUEST_CLASSIFIER, --request-classifier=REQUEST_CLASSIFIER
+                            Request classification function
+                            (module:expr)
+      -e DISCONNECT_MESSAGE, --disconnect-message=DISCONNECT_MESSAGE
+                            Path to error page to use when a request is
+                            lost due to worker disconnection
+
+
+Let's start with a simple call:
+
+    >>> lb, server, accesslog = zc.resumelb.zk.lbmain(
+    ...     'zookeeper.example.com:2181 /test/lb')
+
+    >>> import sys
+    >>> accesslog is sys.stdout
+    True
+
+At this point, the lb is running and listening on all interfaces using
+a self assigned port.
+
+It has connected to the worker and thus has a pool size of one:
+
+    >>> len(lb.pool.workers)
+    1
+
+Let's make a web request.  First, we'll get the lb address from ZooKeeper:
+
+    >>> import zc.parse_addr
+    >>> addr = zc.parse_addr.parse_addr(
+    ...     zk.get_children('/test/lb/providers')[0])
+
+All of the addresses are ips, not hostnames. None of them include localhost:
+
+    >>> [a for a in zk.get_children('/test/lb/providers')
+    ...  if a.startswith('localhost:')]
+    []
+
+
+Then we'll make a simpler GET request:
+
+    >>> import gevent.socket
+    >>> sock = gevent.socket.create_connection(('127.0.0.1', addr[1]))
+    >>> sock.sendall('''GET /hi.html HTTP/1.0\r
+    ... Host: h1.com\r
+    ... Content-Length: 0\r
+    ... \r
+    ... ''')
+
+    >>> print sock.recv(9999) # doctest: +ELLIPSIS
+    127.0.0.1 - - [2012-01-28 15:29:30] "GET /hi.html HTTP/1.0" 200 226 0.002349
+    HTTP/1.0 200 OK...
+
+If we create another worker, it will be seen by the load balancer:
+
+    >>> worker2 = zc.resumelb.zk.worker(
+    ...     app, None,
+    ...     zookeeper='zookeeper.example.com:2181', path='/test/lb/workers',
+    ...     address='127.0.0.1:0', run=False)
+
+    >>> len(lb.pool.workers)
+    2
+
+OK, so let's try a more complex examlpe.  Maybe we can exercise all of
+the options!
+
+    >>> server.stop()
+    >>> lb.stop()
+    >>> lb.zk.close()
+
+    >>> with open('log.conf', 'w') as f:
+    ...   f.write('loggers')
+
+    >>> with open('oops.html', 'w') as f:
+    ...   f.write("oops")
+
+    >>> with mock.patch('ZConfig.configureLoggers') as configureLoggers:
+    ...     lb, server, accesslog = zc.resumelb.zk.lbmain(
+    ...         'zookeeper.example.com:2181 /test/lb'
+    ...         ' -alocalhost:0 -laccess.log -b1 -m1'
+    ...         ' --logger-configuration log.conf '
+    ...         ' -rzc.resumelb.tests:test_classifier -eoops.html'
+    ...         )
+    ...     configureLoggers.assert_called_with('loggers')
+
+    >>> lb.disconnect_message
+    'oops'
+
+    >>> [addr] = map(zc.parse_addr.parse_addr,
+    ...              zk.get_children('/test/lb/providers'))
+
+    >>> addr[0]
+    'localhost'
+
+    >>> len(lb.pool.workers)
+    2
+
+    >>> sock = gevent.socket.create_connection(addr)
+    >>> sock2 = gevent.socket.create_connection(addr)
+    >>> sock3 = gevent.socket.create_connection(addr, .01)
+    Traceback (most recent call last):
+    ...
+    timeout: timed out
+
+The 3rd collection failed because we said to only accept one
+connection at a time and set the backlog to 1.
+
+Let's do a request.
+
+    >>> sock.sendall('''GET /hi.html HTTP/1.0\r
+    ... Host: h1.com\r
+    ... Content-Length: 0\r
+    ... \r
+    ... ''')
+    >>> print sock.recv(9999) # doctest: +ELLIPSIS
+    HTTP/1.0 200 OK...
+
+We didn't get an access log entry in the output, because it's in the
+access log.
+
+    >>> accesslog.flush()
+    >>> with open('access.log') as f:
+    ...     print f.read()
+    127.0.0.1 - - [2012-01-29 14:11:37] "GET /hi.html HTTP/1.0" 200 226 0.001074
+    <BLANKLINE>
+
+By looking at the lb's pool's skilled data structure, we can see that
+the test request classifier was used.
+
+    >>> list(lb.pool.skilled)
+    ["yup, it's a test"]
+
+OK. now let's shut down the server and lb.
+
+    >>> server.stop()
+    >>> lb.stop()
+    >>> lb.zk.close()
+
+And the workers:
+
+    >>> worker.stop()
+    >>> worker2.stop()


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
___________________________________________________________________
Added: svn:eol-style
   + native



More information about the checkins mailing list