[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ - The status server provided when using ZooKeeper now includes the

jim cvs-admin at zope.org
Mon Apr 23 20:19:34 UTC 2012


Log message for revision 125239:
  - The status server provided when using ZooKeeper now includes the
    start time of the oldest request for each worker, to be used for
    monitoring.
  

Changed:
  U   zc.resumelb/trunk/src/zc/resumelb/README.txt
  U   zc.resumelb/trunk/src/zc/resumelb/lb.py
  U   zc.resumelb/trunk/src/zc/resumelb/lb.test
  U   zc.resumelb/trunk/src/zc/resumelb/zk.py
  U   zc.resumelb/trunk/src/zc/resumelb/zk.test

-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/README.txt
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/README.txt	2012-04-23 17:25:37 UTC (rev 125238)
+++ zc.resumelb/trunk/src/zc/resumelb/README.txt	2012-04-23 20:19:30 UTC (rev 125239)
@@ -256,6 +256,10 @@
   worker scrores chacking a maximum backlog, we subtract 1 from the
   worker's backlog if it's non-zero.
 
+- The status server provided when using ZooKeeper now includes the
+  start time of the oldest request for each worker, to be used for
+  monitoring.
+
 0.3.0 (2012-03-28)
 ------------------
 

Modified: zc.resumelb/trunk/src/zc/resumelb/lb.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-04-23 17:25:37 UTC (rev 125238)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-04-23 20:19:30 UTC (rev 125239)
@@ -7,6 +7,7 @@
 import logging
 import re
 import sys
+import time
 import webob
 import zc.parse_addr
 import zc.resumelb.util
@@ -279,6 +280,7 @@
     def __init__(self, pool, socket, addr):
         self.pool = pool
         self.nrequest = 0
+        self.requests = {}
         self.__name__ = '%s:%s' % addr
 
         readers = self.connected(socket, addr)
@@ -300,6 +302,11 @@
                 else:
                     reader(data)
 
+    @property
+    def oldest_time(self):
+        if self.requests:
+            return min(self.requests.itervalues())
+
     def __repr__(self):
         return self.__name__
 
@@ -313,47 +320,52 @@
 
         rno = self.nrequest + 1
         self.nrequest = rno % self.maxrno
-        get = self.start(rno)
+        self.requests[rno] = time.time()
         try:
-            self.put((rno, env))
-            content_length = int(env.get('CONTENT_LENGTH', 0))
-            while content_length > 0:
-                data = input.read(min(content_length, block_size))
-                if not data:
-                    # Browser disconnected, cancel the request
-                    self.put((rno, None))
-                    self.end(rno)
-                    return
-                content_length -= len(data)
-                self.put((rno, data))
-            self.put((rno, ''))
+            get = self.start(rno)
+            try:
+                self.put((rno, env))
+                content_length = int(env.get('CONTENT_LENGTH', 0))
+                while content_length > 0:
+                    data = input.read(min(content_length, block_size))
+                    if not data:
+                        # Browser disconnected, cancel the request
+                        self.put((rno, None))
+                        self.end(rno)
+                        return
+                    content_length -= len(data)
+                    self.put((rno, data))
+                self.put((rno, ''))
 
-            data = get()
-            if data is None:
-                raise zc.resumelb.util.Disconnected()
-            logger.debug('start_response %r', data)
-            start_response(*data)
-        except:
-            # not using finally here, because we only want to end on error
-            self.end(rno)
-            raise
-
-        def content():
-            try:
-                while 1:
-                    data = get()
-                    if data:
-                        logger.debug('yield %r', data)
-                        yield data
-                    else:
-                        if data is None:
-                            logger.warning('Disconnected while returning body')
-                        break
-            finally:
+                data = get()
+                if data is None:
+                    raise zc.resumelb.util.Disconnected()
+                logger.debug('start_response %r', data)
+                start_response(*data)
+            except:
+                # not using finally here, because we only want to end on error
                 self.end(rno)
+                raise
 
-        return content()
+            def content():
+                try:
+                    while 1:
+                        data = get()
+                        if data:
+                            logger.debug('yield %r', data)
+                            yield data
+                        else:
+                            if data is None:
+                                logger.warning(
+                                    'Disconnected while returning body')
+                            break
+                finally:
+                    self.end(rno)
 
+            return content()
+        finally:
+            del self.requests[rno]
+
     def disconnected(self):
         self.pool.remove(self)
         zc.resumelb.util.Worker.disconnected(self)

Modified: zc.resumelb/trunk/src/zc/resumelb/lb.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.test	2012-04-23 17:25:37 UTC (rev 125238)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.test	2012-04-23 20:19:30 UTC (rev 125239)
@@ -237,7 +237,62 @@
       overall backlog: 0 Decayed: 1.47809138734 Avg: 0.73904569367
       0: [127.0.0.1:0, 127.0.0.1:0]
 
+Worker oldest-request start times
+=================================
 
+For monitoring purposes, we want to know about requests that take a
+very long time.  Load-balancer worker objects keep track of
+outstanding requests.  We can query this information:
+
+    >>> for worker in sorted(lb.pool.workers):
+    ...     print worker, worker.oldest_time
+    127.0.0.1:0 None
+    127.0.0.1:0 None
+
+    >>> import time
+    >>> t1 = time.time()
+    >>> app1 = webtest.TestApp(lb.handle_wsgi)
+    >>> g1 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
+    >>> rno = read_message(worker1)[0]
+    >>> read_message(worker1) == (rno, '')
+    True
+
+    >>> t2 = time.time()
+    >>> [ot] = [w.oldest_time for (_, w) in lb.pool.skilled['h1.com']]
+    >>> t1 <= ot <= t2
+    True
+
+    >>> gevent.sleep(.01)
+    >>> app2 = webtest.TestApp(lb.handle_wsgi)
+    >>> g2 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
+    >>> gevent.sleep(.01)
+    >>> [ot] == [w.oldest_time for (_, w) in lb.pool.skilled['h1.com']]
+    True
+
+    >>> response = webob.Response('Hello world\n')
+    >>> write_message(worker1, rno, (response.status, response.headers.items()))
+    >>> write_message(worker1, rno, response.body)
+    >>> write_message(worker1, rno, '')
+    >>> g1.join()
+    >>> [ot2] = [w.oldest_time for (_, w) in lb.pool.skilled['h1.com']]
+    >>> ot2 > ot
+    True
+
+    >>> rno = read_message(worker1)[0]
+    >>> read_message(worker1) == (rno, '')
+    True
+    >>> response = webob.Response('Hello world\n')
+    >>> write_message(worker1, rno, (response.status, response.headers.items()))
+    >>> write_message(worker1, rno, response.body)
+    >>> write_message(worker1, rno, '')
+    >>> g2.join()
+
+    >>> for worker in sorted(lb.pool.workers):
+    ...     print worker, worker.oldest_time
+    127.0.0.1:0 None
+    127.0.0.1:0 None
+
+
 Worker disconnection
 ====================
 
@@ -260,18 +315,18 @@
     ...     if rno2 != rno or blank != '':
     ...         print 'oops', (rno2, blank)
     ...     print rno, type(data)
-    3 <type 'dict'>
-    4 <type 'dict'>
     5 <type 'dict'>
     6 <type 'dict'>
     7 <type 'dict'>
     8 <type 'dict'>
     9 <type 'dict'>
     10 <type 'dict'>
+    11 <type 'dict'>
+    12 <type 'dict'>
 
 Now, let's send a partial response from the first GET:
 
-    >>> write_message(worker1, 3, ('200 OK', [
+    >>> write_message(worker1, 5, ('200 OK', [
     ...   ('Content-Length', '42'), ('Content-Type', 'text/html')]))
     >>> gevent.sleep(.01)
 

Modified: zc.resumelb/trunk/src/zc/resumelb/zk.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/zk.py	2012-04-23 17:25:37 UTC (rev 125238)
+++ zc.resumelb/trunk/src/zc/resumelb/zk.py	2012-04-23 20:19:30 UTC (rev 125239)
@@ -219,7 +219,11 @@
                     backlog = pool.backlog,
                     mean_backlog = pool.mbacklog,
                     workers = [
-                        (worker.__name__, worker.backlog, worker.mbacklog)
+                        (worker.__name__,
+                         worker.backlog,
+                         worker.mbacklog,
+                         worker.oldest_time
+                         )
                         for worker in sorted(
                             pool.workers, key=lambda w: w.__name__)
                         ]

Modified: zc.resumelb/trunk/src/zc/resumelb/zk.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/zk.test	2012-04-23 17:25:37 UTC (rev 125238)
+++ zc.resumelb/trunk/src/zc/resumelb/zk.test	2012-04-23 20:19:30 UTC (rev 125239)
@@ -239,7 +239,6 @@
     ...  if a.startswith('localhost:')]
     []
 
-
 Then we'll make a simpler GET request:
 
     >>> import gevent.socket
@@ -291,8 +290,10 @@
                    0,
                    ...]]}
 
-    >>> sorted(status['workers'], key=lambda w: w[2]) # doctest: +ELLIPSIS
-    [[u'127.0.0.1:...', 0, 0], [u'127.0.0.1:...', 0, 0.48...]]
+    >>> sorted(status['workers'], key=lambda w: w[2])
+    ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    [[u'127.0.0.1:...', 0, 0, None],
+     [u'127.0.0.1:...', 0, 0.48..., None]]
 
     >>> status_file.close()
     >>> status_socket.close()



More information about the checkins mailing list