[Checkins] SVN: Sandbox/J1m/resumelb/ Working prototype.

Jim Fulton jim at zope.com
Thu Oct 27 11:03:52 UTC 2011


Log message for revision 123166:
  Working prototype.
  

Changed:
  U   Sandbox/J1m/resumelb/buildout.cfg
  U   Sandbox/J1m/resumelb/setup.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/
  A   Sandbox/J1m/resumelb/src/zc/resumelb/__init__.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/util.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/worker.py

-=-
Modified: Sandbox/J1m/resumelb/buildout.cfg
===================================================================
--- Sandbox/J1m/resumelb/buildout.cfg	2011-10-27 11:01:58 UTC (rev 123165)
+++ Sandbox/J1m/resumelb/buildout.cfg	2011-10-27 11:03:51 UTC (rev 123166)
@@ -1,12 +1,55 @@
 [buildout]
 develop = .
-parts = test py
+#parts = test py
+parts = gevent py ctl
 
+[ctl]
+recipe = zc.recipe.rhrc
+dest = ${buildout:bin-directory}
+parts = lb worker1 worker2 worker3
+
+[libevent]
+recipe = zc.recipe.cmmi
+url = https://github.com/downloads/libevent/libevent/libevent-2.0.14-stable.tar.gz
+
+[gevent]
+recipe = zc.recipe.egg:custom
+include-dirs = ${libevent:location}/include
+library-dirs = ${libevent:location}/lib
+
 [test]
 recipe = zc.recipe.testrunner
-eggs = 
+eggs = zc.resumelb [test]
 
 [py]
 recipe = zc.recipe.egg
 eggs = ${test:eggs}
+       PasteScript
+       
 interpreter = py
+
+[lb]
+recipe = zc.zdaemonrecipe
+worker_addr = :8000
+program = ${buildout:bin-directory}/resumelb :8080 ${:worker_addr}
+
+[worker.ini]
+recipe = zc.recipe.deployment:configuration
+text =
+  [app:main]
+  use = egg:bobo
+  bobo_resources = zc.resumelb.tests
+  
+  [server:main]
+  use = egg:zc.resumelb
+  lb = ${lb:worker_addr}
+
+[worker1]
+recipe = zc.zdaemonrecipe
+program = ${buildout:bin-directory}/paster serve ${worker.ini:location}
+
+[worker2]
+<= worker1
+
+[worker3]
+<= worker1

Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py	2011-10-27 11:01:58 UTC (rev 123165)
+++ Sandbox/J1m/resumelb/setup.py	2011-10-27 11:03:51 UTC (rev 123166)
@@ -11,12 +11,17 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
-name, version = 'zc.', '0'
+name, version = 'zc.resumelb', '0'
 
-install_requires = ['setuptools']
-extras_require = dict(test=['zope.testing'])
+install_requires = ['setuptools', 'gevent']
+extras_require = dict(test=['zope.testing', 'bobo'])
 
 entry_points = """
+[console_scripts]
+resumelb = zc.resumelb.lb:main
+
+[paste.server_runner]
+main = zc.resumelb.worker:server_runner
 """
 
 from setuptools import setup

Added: Sandbox/J1m/resumelb/src/zc/resumelb/__init__.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/__init__.py	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/__init__.py	2011-10-27 11:03:51 UTC (rev 123166)
@@ -0,0 +1 @@
+#


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/__init__.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2011-10-27 11:03:51 UTC (rev 123166)
@@ -0,0 +1,183 @@
+import bisect
+import gevent
+import gevent.hub
+import gevent.pywsgi
+import gevent.server
+import logging
+import sys
+import zc.resumelb.util
+
+block_size = 1<<16
+
+logger = logging.getLogger(__name__)
+
+class Server:
+
+    def __init__(self, wsgi_addr, worker_addr, classifier):
+        self.workers = {None: []}
+        self.workers_event = gevent.event.Event()
+        self.wsgi_addr = wsgi_addr
+        self.worker_addr = worker_addr
+        self.classifier = classifier
+        self.worker_server = gevent.server.StreamServer(
+            worker_addr, self.handle_worker)
+        self.worker_server.start()
+        self.wsgi_server = gevent.pywsgi.WSGIServer(wsgi_addr, self.handle_wsgi)
+        self.wsgi_server.start()
+
+    def handle_worker(self, socket, addr):
+        logger.info('new worker')
+        Worker(self, socket, addr)
+
+    def handle_wsgi(self, env, start_response):
+        rclass = self.classifier(env)
+        logger.debug('wsgi: %s', rclass)
+        env['zc.resumelb.request_class'] = rclass
+
+        while 1:
+            workers = self.workers.get(rclass)
+            if workers:
+                worker = workers[-1][1]
+            else:
+                workers = self.workers.get(None)
+                if workers:
+                    worker = workers[-1][1]
+                    assert rclass not in worker.resume
+                    worker.unregister()
+                    worker.resume[rclass] = 1
+                    worker.register()
+                else:
+                    self.workers_event.clear()
+                    self.workers_event.wait()
+                    continue
+
+            try:
+                return worker.handle(env, start_response)
+            except worker.Disconnected:
+                # XXX need to be more careful about whether
+                # start_response was called.
+                if int(env.get(CONTENT_LENGTH, 0)) == 0:
+                    logger.info("retrying %s", env)
+                else:
+                    raise
+
+class Worker(zc.resumelb.util.Worker):
+
+    def __init__(self, server, socket, addr):
+        self.server = server
+        self.nrequest = 0
+        self.resume = {}
+
+        readers = self.connected(socket, addr)
+        self.register()
+
+        while self.connected:
+            try:
+                rno, data = zc.resumelb.util.read_message(socket)
+            except gevent.GreenletExit:
+                self.disconnected()
+                return
+
+            if rno == 0:
+                self.unregister()
+                self.resume = data
+                self.register()
+            else:
+                readers[rno](data)
+
+    def handle(self, env, start_response):
+        logger.debug('handled by %s', self.addr)
+
+        env = env.copy()
+        err = env.pop('wsgi.errors')
+        input = env.pop('wsgi.input')
+
+        rno = self.nrequest = self.nrequest + 1
+        get = self.start(rno)
+
+        self.put((rno, env))
+        content_length = int(env.get('CONTENT_LENGTH', 0))
+        while content_length > 0:
+            data = input.read(min(content_length, block_size))
+            if not data:
+                # Browser disconnected, cancel the request
+                self.put((rno, None))
+                self.end(rno)
+                return
+            content_length -= len(data)
+            self.put((rno, data))
+        self.put((rno, ''))
+
+        data = get()
+        if data is None:
+            raise self.Disconnected()
+        logger.debug('start_response %r', data)
+        start_response(*data)
+
+        def content():
+            while 1:
+                data = get()
+                if data:
+                    logger.debug('yield %r', data)
+                    yield data
+                elif data is None:
+                    raise self.Disconnected()
+                else:
+                    self.end(rno)
+                    break
+
+        return content()
+
+    def register(self):
+        resume = self.resume
+        self.score = - sum(resume.itervalues())
+        workers_by = self.server.workers
+
+        def _register(workers, score):
+            item = score, self
+            index = bisect.bisect_left(workers, item)
+            workers.insert(index, item)
+
+        _register(workers_by[None], self.score)
+
+        for site, score in resume.iteritems():
+            workers = workers_by.get(site)
+            if workers is None:
+                workers = workers_by[site] = []
+            _register(workers, score)
+
+        self.server.workers_event.set()
+
+    def unregister(self):
+        workers_by = self.server.workers
+
+        def _unregister(workers, score):
+            index = bisect.bisect_left(workers, (score, self))
+            del workers[index]
+
+        _unregister(workers_by[None], self.score)
+        for site, score in self.resume.iteritems():
+            _unregister(workers_by[site], score)
+
+    def disconnected(self):
+        self.unregister()
+        zc.resumelb.util.Worker.disconnected(self)
+
+def parse_addr(addr):
+    host, port = addr.split(':')
+    return host, int(port)
+
+def host_classifier(env):
+    return env.get("HTTP_HOST", '')
+
+def main(args=None):
+    if args is None:
+        args = sys.argv[1:]
+
+    logging.basicConfig(level=logging.INFO)
+    laddr, waddr = args
+    Server(parse_addr(laddr), parse_addr(waddr), host_classifier)
+
+    gevent.hub.get_hub().switch()
+
+


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/tests.py	2011-10-27 11:03:51 UTC (rev 123166)
@@ -0,0 +1,8 @@
+import bobo
+import os
+
+ at bobo.query
+def hi(bobo_request):
+    return "\n\n%s -> %s\n\n" % (
+        bobo_request.url, os.getpid())
+


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: Sandbox/J1m/resumelb/src/zc/resumelb/util.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/util.py	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/util.py	2011-10-27 11:03:51 UTC (rev 123166)
@@ -0,0 +1,94 @@
+from struct import pack, unpack
+import errno
+import gevent.queue
+import logging
+import marshal
+import socket
+
+logger = logging.getLogger(__name__)
+
+disconnected_errors = (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN,
+                 errno.ESHUTDOWN, errno.ECONNABORTED)
+
+def read_message(sock):
+    data = ''
+    while len(data) < 8:
+        recieved = sock.recv(8-len(data))
+        if not recieved:
+            logger.info("read_message disconnected %s", sock)
+            raise gevent.GreenletExit()
+        data += recieved
+
+    rno, l = unpack(">II", data)
+
+    data = ''
+    while len(data) < l:
+        recieved = sock.recv(l-len(data))
+        if not recieved:
+            logger.info("read_message disconnected %s", sock)
+            raise gevent.GreenletExit()
+        data += recieved
+
+    return rno, marshal.loads(data)
+
+def write_message(sock, rno, data):
+    data = marshal.dumps(data)
+    data = pack(">II", rno, len(data))+data
+    while data:
+        try:
+            sent = sock.send(data)
+        except socket.error, err:
+            if err.args[0] in disconnected_errors:
+                logger.debug("write_message disconnected %s", sock)
+                raise gevent.GreenletExit()
+            else:
+                raise
+        data = data[sent:]
+
+def writer(writeq, sock, multiplexer):
+    while 1:
+        rno, data = writeq.get()
+        try:
+            write_message(sock, rno, data)
+        except gevent.GreenletExit:
+            multiplexer.disconnected()
+            return
+
+class Worker:
+
+    def connected(self, socket, addr=None):
+        if addr is None:
+            addr = socket.getpeername()
+        logger.info('worker connected %s', addr)
+        self.addr = addr
+        self.readers = {}
+        writeq = gevent.queue.Queue()
+        gevent.Greenlet.spawn(writer, writeq, socket, self)
+        self.put = writeq.put
+        self.connected = True
+        return self.readers
+
+    def __len__(self):
+        return len(self.readers)
+
+    def start(self, rno):
+        readq = gevent.queue.Queue()
+        self.readers[rno] = readq.put
+        return readq.get
+
+    def end(self, rno):
+        del self.readers[rno]
+
+    class Disconnected(Exception):
+        pass
+
+    def put_disconnected(self, *a, **k):
+        raise self.Disconnected()
+
+    def disconnected(self):
+        logger.info('worker disconnected %s', self.addr)
+        self.connected = False
+        for put in self.readers.itervalues():
+            put(None)
+
+        self.put = self.put_disconnected


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/util.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2011-10-27 11:03:51 UTC (rev 123166)
@@ -0,0 +1,111 @@
+import cStringIO
+import errno
+import gevent
+import gevent.hub
+import gevent.socket
+import logging
+import socket
+import sys
+import time
+import zc.resumelb.util
+
+logger = logging.getLogger(__name__)
+
+class Worker(zc.resumelb.util.Worker):
+
+    def __init__(self, app, addr, history):
+        self.app = app
+        self.resume = {}
+        self.time_ring_size = history
+        self.time_ring = []
+        self.time_ring_pos = 0
+        while 1:
+            try:
+                self.connect(addr)
+            except socket.error, err:
+                if err.args[0] == errno.ECONNREFUSED:
+                    gevent.sleep(1)
+                else:
+                    raise
+
+    def connect(self, addr):
+        socket = gevent.socket.create_connection(addr)
+        readers = self.connected(socket)
+
+        while self.connected:
+            try:
+                rno, data = zc.resumelb.util.read_message(socket)
+            except gevent.GreenletExit:
+                self.disconnected()
+                return
+
+            rput = readers.get(rno)
+            if rput is None:
+                env = data
+                env['zc.resumelb.time'] = time.time()
+                env['zc.resumelb.lb_addr'] = self.addr
+                gevent.Greenlet.spawn(self.handle, rno, self.start(rno), env)
+            else:
+                rput(data)
+
+    def handle(self, rno, get, env):
+        f = cStringIO.StringIO()
+        env['wsgi.input'] = f
+        env['wsgi.errors'] = sys.stderr
+
+        # XXX We're buffering input.  It maybe should to have option not to.
+        while 1:
+            data = get()
+            if data:
+                f.write(data)
+            elif data is None:
+                # Request cancelled (or worker disconnected)
+                self.end(rno)
+                return
+            else:
+                break
+
+        def start_response(status, headers, exc_info=None):
+            assert not exc_info # XXX
+            self.put((rno, (status, headers)))
+
+        try:
+            for data in self.app(env, start_response):
+                self.put((rno, data))
+
+            self.put((rno, ''))
+
+            elapsed = time.time() - env['zc.resumelb.time']
+            time_ring = self.time_ring
+            time_ring_pos = rno % self.time_ring_size
+            rclass = env['zc.resumelb.request_class']
+            try:
+                time_ring[time_ring_pos] = rclass, elapsed
+            except IndexError:
+                while len(time_ring) <= time_ring_pos:
+                    time_ring.append((rclass, elapsed))
+
+            if rno % self.time_ring_size == 0:
+                byrclass = {}
+                for rclass, elapsed in time_ring:
+                    sumn = byrclass.get(rclass)
+                    if sumn:
+                        sumn[0] += elapsed
+                        sumn[1] += 1
+                    else:
+                        byrclass[rclass] = [elapsed, 1]
+                self.resume = dict(
+                    (rclass, n/sum)
+                    for (rclass, (sum, n)) in byrclass.iteritems()
+                    )
+                self.put((0, self.resume))
+
+        except self.Disconnected:
+            return # whatever
+
+def server_runner(app, global_conf, lb, history=500): # paste deploy hook
+    logging.basicConfig(level=logging.INFO)
+    host, port = lb.split(':')
+    Worker(app, (host, int(port)), history)
+    gevent.hub.get_hub().switch()
+


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native



More information about the checkins mailing list