[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Fixed: Temporary files created when buffering data in the load

jim cvs-admin at zope.org
Wed May 9 21:42:23 UTC 2012


Log message for revision 125792:
  Fixed: Temporary files created when buffering data in the load
  balancers weren't closed explicitly.  Generally, they were closed
  through garbage collection, but in certain situations, their numbers
  could build quickly, leading to file-descriptor exhaustion.
  
  Moved some more helper functions from tests to tests module for reuse.
  
  Added spawn halper cuz gevent green;et exception reporting sucks.
  

Changed:
  U   zc.resumelb/trunk/src/zc/resumelb/README.txt
  U   zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test
  U   zc.resumelb/trunk/src/zc/resumelb/lb.py
  U   zc.resumelb/trunk/src/zc/resumelb/lb.test
  U   zc.resumelb/trunk/src/zc/resumelb/tests.py
  U   zc.resumelb/trunk/src/zc/resumelb/util.py
  U   zc.resumelb/trunk/src/zc/resumelb/worker.py

-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/README.txt
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/README.txt	2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/README.txt	2012-05-09 21:42:19 UTC (rev 125792)
@@ -242,9 +242,14 @@
 Change History
 ==============
 
-0.5.2 (2012-05-??)
+0.5.2 (2012-05-09)
 ------------------
 
+- Fixed: Temporary files created when buffering data in the load
+  balancers weren't closed explicitly.  Generally, they were closed
+  through garbage collection, but in certain situations, their numbers
+  could build quickly, leading to file-descriptor exhaustion.
+
 - Fixed: Tracelog 'I' records didn't always contain input length information.
 
 - Fixed: Tracelog 'I' records were only included when using thread pools.

Modified: zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test	2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test	2012-05-09 21:42:19 UTC (rev 125792)
@@ -29,9 +29,9 @@
     >>> q.qsize()
     0
 
-If it hasn't started bufferng yet, it doesn't have a close method:
+If it hasn't started bufferng yet, it's queue is not a Buffer
 
-    >>> hasattr(q, 'close')
+    >>> isinstance(q.queue, zc.resumelb.util.Buffer)
     False
 
 In testing, the underlying queue max size is only 999 bytes.
@@ -41,11 +41,11 @@
 bytes in the queue are over the limit:
 
     >>> q.put('a'*1000)
-    >>> hasattr(q, 'close')
+    >>> isinstance(q.queue, zc.resumelb.util.Buffer)
     False
 
     >>> q.put('b'*1000)
-    >>> hasattr(q, 'close')
+    >>> isinstance(q.queue, zc.resumelb.util.Buffer)
     True
 
 Now, we've triggered the buffering. We can keep adding data:

Modified: zc.resumelb/trunk/src/zc/resumelb/lb.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-05-09 21:42:19 UTC (rev 125792)
@@ -272,13 +272,10 @@
     worker.dbacklog = worker.dbacklog*decay + worker.backlog
     worker.nbacklog = worker.nbacklog*decay + 1
     worker.mbacklog = worker.dbacklog / worker.nbacklog
+class Worker(zc.resumelb.util.LBWorker):
 
-class Worker(zc.resumelb.util.Worker):
-
     maxrno = (1<<32) - 1
 
-    ReadQueue = zc.resumelb.util.BufferedQueue
-
     def __init__(self, pool, socket, addr):
         self.pool = pool
         self.nrequest = 0
@@ -324,7 +321,7 @@
         self.nrequest = rno % self.maxrno
         self.requests[rno] = time.time()
         try:
-            get = self.start(rno)
+            get = self.start(rno).get
             put = self.put
             try:
                 put((rno, env))
@@ -352,10 +349,12 @@
 
             def content():
                 try:
+                    # We yield a first value to get into the try so
+                    # the generator close will execute thf finally block. :(
+                    yield 1
                     while 1:
                         data = get()
                         if data:
-                            #logger.debug('yield %r', data)
                             yield data
                         else:
                             if data is None:
@@ -365,7 +364,10 @@
                 finally:
                     self.end(rno)
 
-            return content()
+            # See the "yield 1" comment above. :(
+            content = content()
+            content.next()
+            return content
         finally:
             del self.requests[rno]
 

Modified: zc.resumelb/trunk/src/zc/resumelb/lb.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.test	2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.test	2012-05-09 21:42:19 UTC (rev 125792)
@@ -10,17 +10,6 @@
 
 To test lb behavior, we'll create faux workers the lb can connect to.
 
-    >>> import gevent.server
-    >>> class Worker:
-    ...     def __init__(self):
-    ...         self.server = server = gevent.server.StreamServer(
-    ...             ('127.0.0.1', 0), self.handle)
-    ...         server.start()
-    ...         self.addr = '127.0.0.1', server.server_port
-    ...     def handle(self, socket, addr):
-    ...         self.socket = socket
-
-
     >>> workers = [Worker() for i in range(2)]
 
 We have some workers running. Now, let's create a load balancer:
@@ -50,6 +39,8 @@
 
     >>> write_message(worker1, 0, {'h1.com': 10.0})
     >>> write_message(worker2, 0, {'h2.com': 10.0})
+
+    >>> import gevent
     >>> gevent.sleep(.01) # Give resumes time to arrive
 
 Now, let's make a request and make sure the data gets where it's
@@ -57,7 +48,7 @@
 
     >>> import webtest
     >>> app1 = webtest.TestApp(lb.handle_wsgi)
-    >>> g1 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
+    >>> g1 = spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
 
     >>> rno, env1 = read_message(worker1)
 
@@ -92,7 +83,7 @@
 socket.  This time, we'll make a request that provides a large body:
 
     >>> app2 = webtest.TestApp(lb.handle_wsgi)
-    >>> g2 = gevent.spawn(
+    >>> g2 = spawn(
     ...     app2.put, '/hi.html', 'i'*200000, [('Host', 'h1.com')])
 
     >>> rno, env2 = read_message(worker1)
@@ -143,7 +134,7 @@
 If we make a request to h2.com, we'll get the request on worker2:
 
     >>> app3 = webtest.TestApp(lb.handle_wsgi)
-    >>> g3 = gevent.spawn(app3.get, '/hi.html', {}, [('Host', 'h2.com')])
+    >>> g3 = spawn(app3.get, '/hi.html', {}, [('Host', 'h2.com')])
 
     >>> rno, env3 = read_message(worker2)
     >>> rno
@@ -252,7 +243,7 @@
     >>> import time
     >>> t1 = time.time()
     >>> app1 = webtest.TestApp(lb.handle_wsgi)
-    >>> g1 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
+    >>> g1 = spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
     >>> rno = read_message(worker1)[0]
     >>> read_message(worker1) == (rno, '')
     True
@@ -264,7 +255,7 @@
 
     >>> gevent.sleep(.01)
     >>> app2 = webtest.TestApp(lb.handle_wsgi)
-    >>> g2 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
+    >>> g2 = spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
     >>> gevent.sleep(.01)
     >>> [ot] == [w.oldest_time for (_, w) in lb.pool.skilled['h1.com']]
     True
@@ -308,7 +299,7 @@
     ...                'OPTIONS', 'TRACE'):
     ...     app = webtest.TestApp(lb.handle_wsgi)
     ...     greenlets.append(
-    ...         gevent.spawn(app.request, '/hi.html', method=method,
+    ...         spawn(app.request, '/hi.html', method=method,
     ...                      headers=[('Host', 'h1.com')], status='*'))
     ...     rno, data = read_message(worker1)
     ...     rno2, blank = read_message(worker1)
@@ -457,7 +448,7 @@
 
 If we submit an h4.com request, it will go to the new worker:
 
-    >>> g = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h4.com')])
+    >>> g = spawn(app1.get, '/hi.html', {}, [('Host', 'h4.com')])
     >>> rno, env = read_message(workers[-1].socket)
     >>> read_message(workers[-1].socket)
     (1, '')
@@ -501,7 +492,7 @@
 We already have a request in flight.  Let's add another on a different
 worker:
 
-    >>> g2 = gevent.spawn(app2.get, '/hi.html', {}, [('Host', 'h2.com')])
+    >>> g2 = spawn(app2.get, '/hi.html', {}, [('Host', 'h2.com')])
 
     >>> gevent.sleep(.01)
     >>> [w.backlog for w in lb.pool.workers]
@@ -510,7 +501,7 @@
 If we call shutdown, it will block until we have no in-flight
 connections, so we'll call it in a greenlet:
 
-    >>> shutdown_greenlet = gevent.spawn(lb.shutdown)
+    >>> shutdown_greenlet = spawn(lb.shutdown)
     >>> gevent.sleep(.01)
     >>> shutdown_greenlet.ready()
     False

Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-05-09 21:42:19 UTC (rev 125792)
@@ -14,6 +14,7 @@
 import bobo
 import doctest
 import gevent
+import gevent.server
 import gevent.socket
 import hashlib
 import manuel.capture
@@ -28,6 +29,8 @@
 import traceback
 import unittest
 import webob
+import webtest
+import zc.resumelb.lb
 import zc.resumelb.util
 import zc.resumelb.worker
 import zc.zk.testing
@@ -125,6 +128,24 @@
     if size_only:
        print size
 
+def spawn(func, *a, **kw):
+    def run_func():
+        try:
+            return func(*a, **kw)
+        except Exception:
+            traceback.print_exc()
+            raise
+    return gevent.spawn(run_func)
+
+class FauxWorker:
+    def __init__(self):
+        self.server = server = gevent.server.StreamServer(
+            ('127.0.0.1', 0), self.handle)
+        server.start()
+        self.addr = '127.0.0.1', server.server_port
+    def handle(self, socket, addr):
+        self.socket = socket
+
 def test_loading_recipes_with_no_history_argument():
     """A bug as introduced that caused resumes to be loaded
     incorrectly when no history was given to the constructor.  It
@@ -180,9 +201,52 @@
 
     >>> handler.uninstall()
     >>> worker.stop()
+    """ #"
+
+def Buffering_Temporary_Files_are_closed():
     """
+    When a worker sends data to an lb faster than it can send it to a
+    browser, the data gets buffered in a temporary file.  When the
+    request is done, the tempirary fileis explicitly closed.
 
+    >>> worker = FauxWorker()
+    >>> lb = zc.resumelb.lb.LB([worker.addr], zc.resumelb.lb.host_classifier)
+    >>> wait(lambda : hasattr(worker, 'socket'))
+    >>> zc.resumelb.util.write_message(worker.socket, 0, {})
+    >>> wait(lambda : lb.pool.workers)
 
+Now make a request that doesn't read data, but waits until we tell it
+to close it's iterator:
+
+    >>> event = gevent.event.Event()
+    >>> @spawn
+    ... def client():
+    ...     def start(*a):
+    ...         print 'start_response', a
+    ...     body = lb.handle_wsgi(
+    ...         webob.Request.blank('/hi.html').environ, start)
+    ...     event.wait()
+    ...     body.close()
+    ...     print 'closed body'
+
+Now, we'll send it enough data to make it ise a temporary file:
+
+    >>> [lbworker] = list(lb.pool.workers)
+    >>> wait(lambda : lbworker.queues)
+
+    >>> zc.resumelb.util.write_message(
+    ...     worker.socket, 1, ('200 OK', []), 'x'*10000, 'x'*10000)
+
+    >>> wait(lambda : hasattr(lbworker.queues[1].queue, 'file'))
+    start_response ('200 OK', [])
+
+    >>> f = lbworker.queues[1].queue.file
+    >>> event.set()
+    >>> wait(lambda : f.closed)
+    closed body
+
+    """
+
 def test_classifier(env):
     return "yup, it's a test"
 
@@ -203,6 +267,8 @@
     zc.resumelb.util.queue_size_bytes = 999
     test.globs['newenv'] = newenv
     test.globs['print_response'] = print_response
+    test.globs['spawn'] = spawn
+    test.globs['Worker'] = FauxWorker
 
 def zkSetUp(test):
     setUp(test)

Modified: zc.resumelb/trunk/src/zc/resumelb/util.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/util.py	2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/util.py	2012-05-09 21:42:19 UTC (rev 125792)
@@ -115,6 +115,9 @@
     def qsize(self):
         return self.queue.qsize()
 
+    def close(self):
+        pass
+
 class Buffer:
 
     size = size_bytes = read_position = write_position = 0
@@ -181,7 +184,7 @@
                             self.size_bytes -= len(data)
                         self.size -= 1
                     else:
-                        assert size == -1
+                        assert self.size == -1
 
 
 class Worker:
@@ -206,15 +209,13 @@
     def start(self, rno):
         readq = self.ReadQueue()
         self.readers[rno] = readq.put
-        return readq.get
+        return readq
 
     def end(self, rno):
         try:
             queue = self.readers.pop(rno)
         except KeyError:
             return # previously cancelled
-        if hasattr(queue, 'close'):
-            queue.close()
 
     def put_disconnected(self, *a, **k):
         raise Disconnected()
@@ -226,3 +227,22 @@
             put(None)
 
         self.put = self.put_disconnected
+
+class LBWorker(Worker):
+
+    ReadQueue = BufferedQueue
+
+    def connected(self, socket, addr=None):
+        self.queues = {}
+        return Worker.connected(self, socket, addr)
+
+    def start(self, rno):
+        self.queues[rno] = queue = Worker.start(self, rno)
+        return queue
+
+    def end(self, rno):
+        try:
+            queue = self.readers.pop(rno)
+        except KeyError:
+            return # previously cancelled
+        self.queues.pop(rno).close()

Modified: zc.resumelb/trunk/src/zc/resumelb/worker.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/worker.py	2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/worker.py	2012-05-09 21:42:19 UTC (rev 125792)
@@ -189,7 +189,7 @@
                         env = data
                         env['zc.resumelb.lb_addr'] = addr
                         gevent.spawn(
-                            self.handle, conn, rno, conn.start(rno), env)
+                            self.handle, conn, rno, conn.start(rno).get, env)
                 else:
                     rput(data)
                     if data is None:



More information about the checkins mailing list