[Checkins] SVN: Sandbox/J1m/resumelb/ Working prototype.
Jim Fulton
jim at zope.com
Thu Oct 27 11:03:52 UTC 2011
Log message for revision 123166:
Working prototype.
Changed:
U Sandbox/J1m/resumelb/buildout.cfg
U Sandbox/J1m/resumelb/setup.py
A Sandbox/J1m/resumelb/src/zc/resumelb/
A Sandbox/J1m/resumelb/src/zc/resumelb/__init__.py
A Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
A Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
A Sandbox/J1m/resumelb/src/zc/resumelb/util.py
A Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
-=-
Modified: Sandbox/J1m/resumelb/buildout.cfg
===================================================================
--- Sandbox/J1m/resumelb/buildout.cfg 2011-10-27 11:01:58 UTC (rev 123165)
+++ Sandbox/J1m/resumelb/buildout.cfg 2011-10-27 11:03:51 UTC (rev 123166)
@@ -1,12 +1,55 @@
[buildout]
develop = .
-parts = test py
+#parts = test py
+parts = gevent py ctl
+[ctl]
+recipe = zc.recipe.rhrc
+dest = ${buildout:bin-directory}
+parts = lb worker1 worker2 worker3
+
+[libevent]
+recipe = zc.recipe.cmmi
+url = https://github.com/downloads/libevent/libevent/libevent-2.0.14-stable.tar.gz
+
+[gevent]
+recipe = zc.recipe.egg:custom
+include-dirs = ${libevent:location}/include
+library-dirs = ${libevent:location}/lib
+
[test]
recipe = zc.recipe.testrunner
-eggs =
+eggs = zc.resumelb [test]
[py]
recipe = zc.recipe.egg
eggs = ${test:eggs}
+ PasteScript
+
interpreter = py
+
+[lb]
+recipe = zc.zdaemonrecipe
+worker_addr = :8000
+program = ${buildout:bin-directory}/resumelb :8080 ${:worker_addr}
+
+[worker.ini]
+recipe = zc.recipe.deployment:configuration
+text =
+ [app:main]
+ use = egg:bobo
+ bobo_resources = zc.resumelb.tests
+
+ [server:main]
+ use = egg:zc.resumelb
+ lb = ${lb:worker_addr}
+
+[worker1]
+recipe = zc.zdaemonrecipe
+program = ${buildout:bin-directory}/paster serve ${worker.ini:location}
+
+[worker2]
+<= worker1
+
+[worker3]
+<= worker1
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py 2011-10-27 11:01:58 UTC (rev 123165)
+++ Sandbox/J1m/resumelb/setup.py 2011-10-27 11:03:51 UTC (rev 123166)
@@ -11,12 +11,17 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-name, version = 'zc.', '0'
+name, version = 'zc.resumelb', '0'
-install_requires = ['setuptools']
-extras_require = dict(test=['zope.testing'])
+install_requires = ['setuptools', 'gevent']
+extras_require = dict(test=['zope.testing', 'bobo'])
entry_points = """
+[console_scripts]
+resumelb = zc.resumelb.lb:main
+
+[paste.server_runner]
+main = zc.resumelb.worker:server_runner
"""
from setuptools import setup
Added: Sandbox/J1m/resumelb/src/zc/resumelb/__init__.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/__init__.py (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/__init__.py 2011-10-27 11:03:51 UTC (rev 123166)
@@ -0,0 +1 @@
+#
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/__init__.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2011-10-27 11:03:51 UTC (rev 123166)
@@ -0,0 +1,183 @@
+import bisect
+import gevent
+import gevent.hub
+import gevent.pywsgi
+import gevent.server
+import logging
+import sys
+import zc.resumelb.util
+
+block_size = 1<<16
+
+logger = logging.getLogger(__name__)
+
+class Server:
+
+ def __init__(self, wsgi_addr, worker_addr, classifier):
+ self.workers = {None: []}
+ self.workers_event = gevent.event.Event()
+ self.wsgi_addr = wsgi_addr
+ self.worker_addr = worker_addr
+ self.classifier = classifier
+ self.worker_server = gevent.server.StreamServer(
+ worker_addr, self.handle_worker)
+ self.worker_server.start()
+ self.wsgi_server = gevent.pywsgi.WSGIServer(wsgi_addr, self.handle_wsgi)
+ self.wsgi_server.start()
+
+ def handle_worker(self, socket, addr):
+ logger.info('new worker')
+ Worker(self, socket, addr)
+
+ def handle_wsgi(self, env, start_response):
+ rclass = self.classifier(env)
+ logger.debug('wsgi: %s', rclass)
+ env['zc.resumelb.request_class'] = rclass
+
+ while 1:
+ workers = self.workers.get(rclass)
+ if workers:
+ worker = workers[-1][1]
+ else:
+ workers = self.workers.get(None)
+ if workers:
+ worker = workers[-1][1]
+ assert rclass not in worker.resume
+ worker.unregister()
+ worker.resume[rclass] = 1
+ worker.register()
+ else:
+ self.workers_event.clear()
+ self.workers_event.wait()
+ continue
+
+ try:
+ return worker.handle(env, start_response)
+ except worker.Disconnected:
+ # XXX need to be more careful about whether
+ # start_response was called.
+ if int(env.get(CONTENT_LENGTH, 0)) == 0:
+ logger.info("retrying %s", env)
+ else:
+ raise
+
+class Worker(zc.resumelb.util.Worker):
+
+ def __init__(self, server, socket, addr):
+ self.server = server
+ self.nrequest = 0
+ self.resume = {}
+
+ readers = self.connected(socket, addr)
+ self.register()
+
+ while self.connected:
+ try:
+ rno, data = zc.resumelb.util.read_message(socket)
+ except gevent.GreenletExit:
+ self.disconnected()
+ return
+
+ if rno == 0:
+ self.unregister()
+ self.resume = data
+ self.register()
+ else:
+ readers[rno](data)
+
+ def handle(self, env, start_response):
+ logger.debug('handled by %s', self.addr)
+
+ env = env.copy()
+ err = env.pop('wsgi.errors')
+ input = env.pop('wsgi.input')
+
+ rno = self.nrequest = self.nrequest + 1
+ get = self.start(rno)
+
+ self.put((rno, env))
+ content_length = int(env.get('CONTENT_LENGTH', 0))
+ while content_length > 0:
+ data = input.read(min(content_length, block_size))
+ if not data:
+ # Browser disconnected, cancel the request
+ self.put((rno, None))
+ self.end(rno)
+ return
+ content_length -= len(data)
+ self.put((rno, data))
+ self.put((rno, ''))
+
+ data = get()
+ if data is None:
+ raise self.Disconnected()
+ logger.debug('start_response %r', data)
+ start_response(*data)
+
+ def content():
+ while 1:
+ data = get()
+ if data:
+ logger.debug('yield %r', data)
+ yield data
+ elif data is None:
+ raise self.Disconnected()
+ else:
+ self.end(rno)
+ break
+
+ return content()
+
+ def register(self):
+ resume = self.resume
+ self.score = - sum(resume.itervalues())
+ workers_by = self.server.workers
+
+ def _register(workers, score):
+ item = score, self
+ index = bisect.bisect_left(workers, item)
+ workers.insert(index, item)
+
+ _register(workers_by[None], self.score)
+
+ for site, score in resume.iteritems():
+ workers = workers_by.get(site)
+ if workers is None:
+ workers = workers_by[site] = []
+ _register(workers, score)
+
+ self.server.workers_event.set()
+
+ def unregister(self):
+ workers_by = self.server.workers
+
+ def _unregister(workers, score):
+ index = bisect.bisect_left(workers, (score, self))
+ del workers[index]
+
+ _unregister(workers_by[None], self.score)
+ for site, score in self.resume.iteritems():
+ _unregister(workers_by[site], score)
+
+ def disconnected(self):
+ self.unregister()
+ zc.resumelb.util.Worker.disconnected(self)
+
+def parse_addr(addr):
+ host, port = addr.split(':')
+ return host, int(port)
+
+def host_classifier(env):
+ return env.get("HTTP_HOST", '')
+
+def main(args=None):
+ if args is None:
+ args = sys.argv[1:]
+
+ logging.basicConfig(level=logging.INFO)
+ laddr, waddr = args
+ Server(parse_addr(laddr), parse_addr(waddr), host_classifier)
+
+ gevent.hub.get_hub().switch()
+
+
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/tests.py (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/tests.py 2011-10-27 11:03:51 UTC (rev 123166)
@@ -0,0 +1,8 @@
+import bobo
+import os
+
+ at bobo.query
+def hi(bobo_request):
+ return "\n\n%s -> %s\n\n" % (
+ bobo_request.url, os.getpid())
+
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/tests.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: Sandbox/J1m/resumelb/src/zc/resumelb/util.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/util.py (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/util.py 2011-10-27 11:03:51 UTC (rev 123166)
@@ -0,0 +1,94 @@
+from struct import pack, unpack
+import errno
+import gevent.queue
+import logging
+import marshal
+import socket
+
+logger = logging.getLogger(__name__)
+
+disconnected_errors = (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN,
+ errno.ESHUTDOWN, errno.ECONNABORTED)
+
+def read_message(sock):
+ data = ''
+ while len(data) < 8:
+ recieved = sock.recv(8-len(data))
+ if not recieved:
+ logger.info("read_message disconnected %s", sock)
+ raise gevent.GreenletExit()
+ data += recieved
+
+ rno, l = unpack(">II", data)
+
+ data = ''
+ while len(data) < l:
+ recieved = sock.recv(l-len(data))
+ if not recieved:
+ logger.info("read_message disconnected %s", sock)
+ raise gevent.GreenletExit()
+ data += recieved
+
+ return rno, marshal.loads(data)
+
+def write_message(sock, rno, data):
+ data = marshal.dumps(data)
+ data = pack(">II", rno, len(data))+data
+ while data:
+ try:
+ sent = sock.send(data)
+ except socket.error, err:
+ if err.args[0] in disconnected_errors:
+ logger.debug("write_message disconnected %s", sock)
+ raise gevent.GreenletExit()
+ else:
+ raise
+ data = data[sent:]
+
+def writer(writeq, sock, multiplexer):
+ while 1:
+ rno, data = writeq.get()
+ try:
+ write_message(sock, rno, data)
+ except gevent.GreenletExit:
+ multiplexer.disconnected()
+ return
+
+class Worker:
+
+ def connected(self, socket, addr=None):
+ if addr is None:
+ addr = socket.getpeername()
+ logger.info('worker connected %s', addr)
+ self.addr = addr
+ self.readers = {}
+ writeq = gevent.queue.Queue()
+ gevent.Greenlet.spawn(writer, writeq, socket, self)
+ self.put = writeq.put
+ self.connected = True
+ return self.readers
+
+ def __len__(self):
+ return len(self.readers)
+
+ def start(self, rno):
+ readq = gevent.queue.Queue()
+ self.readers[rno] = readq.put
+ return readq.get
+
+ def end(self, rno):
+ del self.readers[rno]
+
+ class Disconnected(Exception):
+ pass
+
+ def put_disconnected(self, *a, **k):
+ raise self.Disconnected()
+
+ def disconnected(self):
+ logger.info('worker disconnected %s', self.addr)
+ self.connected = False
+ for put in self.readers.itervalues():
+ put(None)
+
+ self.put = self.put_disconnected
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/util.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2011-10-27 11:03:51 UTC (rev 123166)
@@ -0,0 +1,111 @@
+import cStringIO
+import errno
+import gevent
+import gevent.hub
+import gevent.socket
+import logging
+import socket
+import sys
+import time
+import zc.resumelb.util
+
+logger = logging.getLogger(__name__)
+
+class Worker(zc.resumelb.util.Worker):
+
+ def __init__(self, app, addr, history):
+ self.app = app
+ self.resume = {}
+ self.time_ring_size = history
+ self.time_ring = []
+ self.time_ring_pos = 0
+ while 1:
+ try:
+ self.connect(addr)
+ except socket.error, err:
+ if err.args[0] == errno.ECONNREFUSED:
+ gevent.sleep(1)
+ else:
+ raise
+
+ def connect(self, addr):
+ socket = gevent.socket.create_connection(addr)
+ readers = self.connected(socket)
+
+ while self.connected:
+ try:
+ rno, data = zc.resumelb.util.read_message(socket)
+ except gevent.GreenletExit:
+ self.disconnected()
+ return
+
+ rput = readers.get(rno)
+ if rput is None:
+ env = data
+ env['zc.resumelb.time'] = time.time()
+ env['zc.resumelb.lb_addr'] = self.addr
+ gevent.Greenlet.spawn(self.handle, rno, self.start(rno), env)
+ else:
+ rput(data)
+
+ def handle(self, rno, get, env):
+ f = cStringIO.StringIO()
+ env['wsgi.input'] = f
+ env['wsgi.errors'] = sys.stderr
+
+ # XXX We're buffering input. It maybe should to have option not to.
+ while 1:
+ data = get()
+ if data:
+ f.write(data)
+ elif data is None:
+ # Request cancelled (or worker disconnected)
+ self.end(rno)
+ return
+ else:
+ break
+
+ def start_response(status, headers, exc_info=None):
+ assert not exc_info # XXX
+ self.put((rno, (status, headers)))
+
+ try:
+ for data in self.app(env, start_response):
+ self.put((rno, data))
+
+ self.put((rno, ''))
+
+ elapsed = time.time() - env['zc.resumelb.time']
+ time_ring = self.time_ring
+ time_ring_pos = rno % self.time_ring_size
+ rclass = env['zc.resumelb.request_class']
+ try:
+ time_ring[time_ring_pos] = rclass, elapsed
+ except IndexError:
+ while len(time_ring) <= time_ring_pos:
+ time_ring.append((rclass, elapsed))
+
+ if rno % self.time_ring_size == 0:
+ byrclass = {}
+ for rclass, elapsed in time_ring:
+ sumn = byrclass.get(rclass)
+ if sumn:
+ sumn[0] += elapsed
+ sumn[1] += 1
+ else:
+ byrclass[rclass] = [elapsed, 1]
+ self.resume = dict(
+ (rclass, n/sum)
+ for (rclass, (sum, n)) in byrclass.iteritems()
+ )
+ self.put((0, self.resume))
+
+ except self.Disconnected:
+ return # whatever
+
+def server_runner(app, global_conf, lb, history=500): # paste deploy hook
+ logging.basicConfig(level=logging.INFO)
+ host, port = lb.split(':')
+ Worker(app, (host, int(port)), history)
+ gevent.hub.get_hub().switch()
+
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
More information about the checkins
mailing list