[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ - The status server provided when using ZooKeeper now includes the
jim
cvs-admin at zope.org
Mon Apr 23 20:19:34 UTC 2012
Log message for revision 125239:
- The status server provided when using ZooKeeper now includes the
start time of the oldest request for each worker, to be used for
monitoring.
Changed:
U zc.resumelb/trunk/src/zc/resumelb/README.txt
U zc.resumelb/trunk/src/zc/resumelb/lb.py
U zc.resumelb/trunk/src/zc/resumelb/lb.test
U zc.resumelb/trunk/src/zc/resumelb/zk.py
U zc.resumelb/trunk/src/zc/resumelb/zk.test
-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/README.txt
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/README.txt 2012-04-23 17:25:37 UTC (rev 125238)
+++ zc.resumelb/trunk/src/zc/resumelb/README.txt 2012-04-23 20:19:30 UTC (rev 125239)
@@ -256,6 +256,10 @@
worker scrores chacking a maximum backlog, we subtract 1 from the
worker's backlog if it's non-zero.
+- The status server provided when using ZooKeeper now includes the
+ start time of the oldest request for each worker, to be used for
+ monitoring.
+
0.3.0 (2012-03-28)
------------------
Modified: zc.resumelb/trunk/src/zc/resumelb/lb.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.py 2012-04-23 17:25:37 UTC (rev 125238)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.py 2012-04-23 20:19:30 UTC (rev 125239)
@@ -7,6 +7,7 @@
import logging
import re
import sys
+import time
import webob
import zc.parse_addr
import zc.resumelb.util
@@ -279,6 +280,7 @@
def __init__(self, pool, socket, addr):
self.pool = pool
self.nrequest = 0
+ self.requests = {}
self.__name__ = '%s:%s' % addr
readers = self.connected(socket, addr)
@@ -300,6 +302,11 @@
else:
reader(data)
+ @property
+ def oldest_time(self):
+ if self.requests:
+ return min(self.requests.itervalues())
+
def __repr__(self):
return self.__name__
@@ -313,47 +320,52 @@
rno = self.nrequest + 1
self.nrequest = rno % self.maxrno
- get = self.start(rno)
+ self.requests[rno] = time.time()
try:
- self.put((rno, env))
- content_length = int(env.get('CONTENT_LENGTH', 0))
- while content_length > 0:
- data = input.read(min(content_length, block_size))
- if not data:
- # Browser disconnected, cancel the request
- self.put((rno, None))
- self.end(rno)
- return
- content_length -= len(data)
- self.put((rno, data))
- self.put((rno, ''))
+ get = self.start(rno)
+ try:
+ self.put((rno, env))
+ content_length = int(env.get('CONTENT_LENGTH', 0))
+ while content_length > 0:
+ data = input.read(min(content_length, block_size))
+ if not data:
+ # Browser disconnected, cancel the request
+ self.put((rno, None))
+ self.end(rno)
+ return
+ content_length -= len(data)
+ self.put((rno, data))
+ self.put((rno, ''))
- data = get()
- if data is None:
- raise zc.resumelb.util.Disconnected()
- logger.debug('start_response %r', data)
- start_response(*data)
- except:
- # not using finally here, because we only want to end on error
- self.end(rno)
- raise
-
- def content():
- try:
- while 1:
- data = get()
- if data:
- logger.debug('yield %r', data)
- yield data
- else:
- if data is None:
- logger.warning('Disconnected while returning body')
- break
- finally:
+ data = get()
+ if data is None:
+ raise zc.resumelb.util.Disconnected()
+ logger.debug('start_response %r', data)
+ start_response(*data)
+ except:
+ # not using finally here, because we only want to end on error
self.end(rno)
+ raise
- return content()
+ def content():
+ try:
+ while 1:
+ data = get()
+ if data:
+ logger.debug('yield %r', data)
+ yield data
+ else:
+ if data is None:
+ logger.warning(
+ 'Disconnected while returning body')
+ break
+ finally:
+ self.end(rno)
+ return content()
+ finally:
+ del self.requests[rno]
+
def disconnected(self):
self.pool.remove(self)
zc.resumelb.util.Worker.disconnected(self)
Modified: zc.resumelb/trunk/src/zc/resumelb/lb.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.test 2012-04-23 17:25:37 UTC (rev 125238)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.test 2012-04-23 20:19:30 UTC (rev 125239)
@@ -237,7 +237,62 @@
overall backlog: 0 Decayed: 1.47809138734 Avg: 0.73904569367
0: [127.0.0.1:0, 127.0.0.1:0]
+Worker oldest-request start times
+=================================
+For monitoring purposes, we want to know about requests that take a
+very long time. Load-balancer worker objects keep track of
+outstanding requests. We can query this information:
+
+ >>> for worker in sorted(lb.pool.workers):
+ ... print worker, worker.oldest_time
+ 127.0.0.1:0 None
+ 127.0.0.1:0 None
+
+ >>> import time
+ >>> t1 = time.time()
+ >>> app1 = webtest.TestApp(lb.handle_wsgi)
+ >>> g1 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
+ >>> rno = read_message(worker1)[0]
+ >>> read_message(worker1) == (rno, '')
+ True
+
+ >>> t2 = time.time()
+ >>> [ot] = [w.oldest_time for (_, w) in lb.pool.skilled['h1.com']]
+ >>> t1 <= ot <= t2
+ True
+
+ >>> gevent.sleep(.01)
+ >>> app2 = webtest.TestApp(lb.handle_wsgi)
+ >>> g2 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
+ >>> gevent.sleep(.01)
+ >>> [ot] == [w.oldest_time for (_, w) in lb.pool.skilled['h1.com']]
+ True
+
+ >>> response = webob.Response('Hello world\n')
+ >>> write_message(worker1, rno, (response.status, response.headers.items()))
+ >>> write_message(worker1, rno, response.body)
+ >>> write_message(worker1, rno, '')
+ >>> g1.join()
+ >>> [ot2] = [w.oldest_time for (_, w) in lb.pool.skilled['h1.com']]
+ >>> ot2 > ot
+ True
+
+ >>> rno = read_message(worker1)[0]
+ >>> read_message(worker1) == (rno, '')
+ True
+ >>> response = webob.Response('Hello world\n')
+ >>> write_message(worker1, rno, (response.status, response.headers.items()))
+ >>> write_message(worker1, rno, response.body)
+ >>> write_message(worker1, rno, '')
+ >>> g2.join()
+
+ >>> for worker in sorted(lb.pool.workers):
+ ... print worker, worker.oldest_time
+ 127.0.0.1:0 None
+ 127.0.0.1:0 None
+
+
Worker disconnection
====================
@@ -260,18 +315,18 @@
... if rno2 != rno or blank != '':
... print 'oops', (rno2, blank)
... print rno, type(data)
- 3 <type 'dict'>
- 4 <type 'dict'>
5 <type 'dict'>
6 <type 'dict'>
7 <type 'dict'>
8 <type 'dict'>
9 <type 'dict'>
10 <type 'dict'>
+ 11 <type 'dict'>
+ 12 <type 'dict'>
Now, let's send a partial response from the first GET:
- >>> write_message(worker1, 3, ('200 OK', [
+ >>> write_message(worker1, 5, ('200 OK', [
... ('Content-Length', '42'), ('Content-Type', 'text/html')]))
>>> gevent.sleep(.01)
Modified: zc.resumelb/trunk/src/zc/resumelb/zk.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/zk.py 2012-04-23 17:25:37 UTC (rev 125238)
+++ zc.resumelb/trunk/src/zc/resumelb/zk.py 2012-04-23 20:19:30 UTC (rev 125239)
@@ -219,7 +219,11 @@
backlog = pool.backlog,
mean_backlog = pool.mbacklog,
workers = [
- (worker.__name__, worker.backlog, worker.mbacklog)
+ (worker.__name__,
+ worker.backlog,
+ worker.mbacklog,
+ worker.oldest_time
+ )
for worker in sorted(
pool.workers, key=lambda w: w.__name__)
]
Modified: zc.resumelb/trunk/src/zc/resumelb/zk.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/zk.test 2012-04-23 17:25:37 UTC (rev 125238)
+++ zc.resumelb/trunk/src/zc/resumelb/zk.test 2012-04-23 20:19:30 UTC (rev 125239)
@@ -239,7 +239,6 @@
... if a.startswith('localhost:')]
[]
-
Then we'll make a simpler GET request:
>>> import gevent.socket
@@ -291,8 +290,10 @@
0,
...]]}
- >>> sorted(status['workers'], key=lambda w: w[2]) # doctest: +ELLIPSIS
- [[u'127.0.0.1:...', 0, 0], [u'127.0.0.1:...', 0, 0.48...]]
+ >>> sorted(status['workers'], key=lambda w: w[2])
+ ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ [[u'127.0.0.1:...', 0, 0, None],
+ [u'127.0.0.1:...', 0, 0.48..., None]]
>>> status_file.close()
>>> status_socket.close()
More information about the checkins
mailing list