[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Added buffered queues that never block on put and overflow to disk if

jim cvs-admin at zope.org
Thu Apr 26 18:10:30 UTC 2012


Log message for revision 125305:
  Added buffered queues that never block on put and overflow to disk if
  necessary to avoid storing too much in ram.
  
  Found and debugged problems in ByteSizedQueue handling of empty
  strings and None.
  

Changed:
  U   zc.resumelb/trunk/src/zc/resumelb/README.txt
  A   zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test
  U   zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test
  U   zc.resumelb/trunk/src/zc/resumelb/lb.py
  U   zc.resumelb/trunk/src/zc/resumelb/lb.test
  U   zc.resumelb/trunk/src/zc/resumelb/tests.py
  U   zc.resumelb/trunk/src/zc/resumelb/util.py

-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/README.txt
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/README.txt	2012-04-26 08:49:00 UTC (rev 125304)
+++ zc.resumelb/trunk/src/zc/resumelb/README.txt	2012-04-26 18:10:27 UTC (rev 125305)
@@ -242,7 +242,7 @@
 Change History
 ==============
 
-0.4.0 (2012-04-24)
+0.4.0 (2012-04-26)
 ------------------
 
 - Change the load-balancing algorithm to take backlogs of
@@ -266,6 +266,9 @@
 - Fixed: Workers buffered large request bodies in memory.  Now large
   request bodies are buffered to disk.
 
+- Internal optimizations, especially writh regard to handling large
+  request and response bodies.
+
 0.3.0 (2012-03-28)
 ------------------
 

Added: zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test	                        (rev 0)
+++ zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test	2012-04-26 18:10:27 UTC (rev 125305)
@@ -0,0 +1,132 @@
+Bufferred Queues
+================
+
+The resumelb uses queues to interface between multiplexed worker
+connections and client web connections.  For very large responses, you
+could get into a situation where a worker sends data faster than a
+client can recieve it and the data ends up in a queue in
+memory. Specifically, this is the read queue used in a LB handler to
+get data from the the worker.  The handler only pulls data from this
+queue as fast as the WSGI server can send it to the client.  We don't
+want this queue to get backed up, as that will back up other requests
+on the **multiplexed** connection to the worker.
+
+In the LB, handlers have BufferedQueue read queues.  A buffered queue
+wraps a ByteSizedQueue and avoids blocking by writing data to a
+temporary file, if necessary.
+
+    >>> import zc.resumelb.util
+
+    >>> q = zc.resumelb.util.BufferedQueue()
+
+BufferedQueue only implements the ``put``, ``get``, and qsize queue methods.
+
+    >>> q.put('a')
+    >>> q.qsize()
+    1
+    >>> q.get()
+    'a'
+    >>> q.qsize()
+    0
+
+If it hasn't started bufferng yet, it doesn't have a close method:
+
+    >>> hasattr(q, 'close')
+    False
+
+In testing, the underlying queue max size is only 999 bytes.
+
+If we add 1000 bytes, we won't have triggered the buffering, due to
+the fact that the underlying ByteSizedQueue doesn't block until the
+bytes in the queue are over the limit:
+
+    >>> q.put('a'*1000)
+    >>> hasattr(q, 'close')
+    False
+
+    >>> q.put('b'*1000)
+    >>> hasattr(q, 'close')
+    True
+
+Now, we've triggered the buffering. We can keep adding data:
+
+    >>> q.put('c'*1000)
+    >>> q.put('d'*1000)
+    >>> q.put('e'*1000)
+    >>> q.qsize()
+    5000
+
+without blocking.  We can verify that the ByteSizedQueue only has 1000
+bytes by cheating. :)
+
+    >>> q.queue.queue.qsize()
+    1000
+
+The rest is in the file buffer.  Let's drain the queue, to verify we
+didn't lose anything.
+
+    >>> q.get() == 'a'*1000
+    True
+    >>> q.get() == 'b'*1000
+    True
+    >>> q.get() == 'c'*1000
+    True
+    >>> q.get() == 'd'*1000
+    True
+    >>> q.get() == 'e'*1000
+    True
+    >>> q.qsize()
+    0
+
+Let's fill it up again:
+
+    >>> q.put('f'*1000)
+    >>> q.put('g'*1000)
+    >>> q.put('h'*1000)
+    >>> q.put('i'*1000)
+    >>> q.put('j'*1000)
+
+At any point, we can close the queue, which will free resources and
+empty it:
+
+    >>> q.close()
+    >>> q.qsize()
+    0
+
+Being a little white box:
+
+    >>> q.queue.file.closed
+    True
+
+Now, if we put to the queue, it will simply discard the data:
+
+    >>> q.put('a')
+    >>> q.qsize()
+    0
+
+
+Empty strings, None, and other False objects
+--------------------------------------------
+
+    >>> q = zc.resumelb.util.BufferedQueue()
+    >>> q.put('')
+    >>> q.qsize() > 0
+    True
+    >>> q.get()
+    ''
+
+    >>> q.qsize()
+    0
+
+    >>> q.put('a'*1000)
+    >>> q.qsize()
+    1000
+
+    >>> q.put('')
+    >>> q.put(None)
+    >>> q.put(())
+    >>> q.qsize()
+    1000
+
+    >>> q.get() == 'a'*1000, q.get(), q.get(), q.get()
+    (True, '', None, ())


Property changes on: zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test	2012-04-26 08:49:00 UTC (rev 125304)
+++ zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test	2012-04-26 18:10:27 UTC (rev 125305)
@@ -58,7 +58,47 @@
     >>> q.qsize()
     900000
 
+Empty strings
+-------------
 
+There's a corner case when a queue contains just empty strings.  At
+that point, it's byte size is 0, but it's queue size must be positive,
+or get will block.  For that reason, the queue will walways have an
+non-zeo queue size when there are items in it.
 
+    >>> q.put('')
+    >>> q.get() == 'a'*900000
+    True
+    >>> q.qsize() > 0
+    True
 
+    >>> q.put('')
+    >>> q.qsize() > 0
+    True
+    >>> q.get()
+    ''
+    >>> q.qsize() > 0
+    True
+    >>> q.get()
+    ''
+    >>> q.qsize()
+    0
 
+None
+----
+
+None is counted as an empty string:
+
+    >>> q.put(None)
+    >>> q.qsize() > 0
+    True
+    >>> q.put('hi')
+    >>> q.qsize()
+    2
+
+Actually, anything false is:
+
+    >>> q.put([])
+    >>> q.qsize()
+    2
+

Modified: zc.resumelb/trunk/src/zc/resumelb/lb.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-04-26 08:49:00 UTC (rev 125304)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.py	2012-04-26 18:10:27 UTC (rev 125305)
@@ -277,6 +277,8 @@
 
     maxrno = (1<<32) - 1
 
+    ReadQueue = zc.resumelb.util.BufferedQueue
+
     def __init__(self, pool, socket, addr):
         self.pool = pool
         self.nrequest = 0

Modified: zc.resumelb/trunk/src/zc/resumelb/lb.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.test	2012-04-26 08:49:00 UTC (rev 125304)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.test	2012-04-26 18:10:27 UTC (rev 125305)
@@ -177,7 +177,7 @@
     >>> write_message(worker2, 1, (response.status, response.headers.items()))
     >>> write_message(worker2, 1, response.body)
     >>> write_message(worker2, 1, '')
-    >>> g3.join()
+    >>> g3.join(1.0)
     >>> g3.value
     <200 OK text/html body='Hello world\n'>
 
@@ -195,11 +195,11 @@
     >>> write_message(worker1, 1, '')
     >>> write_message(worker1, 2, '')
 
-    >>> g1.join()
+    >>> g1.join(1.0)
     >>> g1.value.status, g1.value.body == '1'*10000
     ('200 OK', True)
 
-    >>> g2.join()
+    >>> g2.join(1.0)
     >>> g2.value.status, g2.value.body == '2'*10000
     ('200 OK', True)
 
@@ -273,7 +273,7 @@
     >>> write_message(worker1, rno, (response.status, response.headers.items()))
     >>> write_message(worker1, rno, response.body)
     >>> write_message(worker1, rno, '')
-    >>> g1.join()
+    >>> g1.join(1.0)
     >>> [ot2] = [w.oldest_time for (_, w) in lb.pool.skilled['h1.com']]
     >>> ot2 > ot
     True
@@ -285,7 +285,7 @@
     >>> write_message(worker1, rno, (response.status, response.headers.items()))
     >>> write_message(worker1, rno, response.body)
     >>> write_message(worker1, rno, '')
-    >>> g2.join()
+    >>> g2.join(1.0)
 
     >>> for worker in sorted(lb.pool.workers):
     ...     print worker, worker.oldest_time
@@ -333,6 +333,7 @@
 Now, we'll disconnect worker1:
 
     >>> worker1.close()
+    >>> gevent.sleep(.1)
 
 The second GET and the HEAD request will be send to worker2:
 
@@ -358,7 +359,7 @@
     ...     write_message(worker2, rno, '')
 
     >>> for g in greenlets:
-    ...    g.join()
+    ...    g.join(1.0)
     ...    print repr(g.value)
     <200 OK text/html no body>
     <200 OK text/html body='Hello test\n'>
@@ -520,7 +521,7 @@
     ...      (response.status, response.headers.items()))
     >>> write_message(workers[-1].socket, rno, response.body)
     >>> write_message(workers[-1].socket, rno, '')
-    >>> g.join()
+    >>> g.join(1.0)
 
     >>> rno, env = read_message(workers[0].socket)
     >>> read_message(workers[0].socket)
@@ -530,7 +531,7 @@
     ...      (response.status, response.headers.items()))
     >>> write_message(workers[0].socket, rno, response.body)
     >>> write_message(workers[0].socket, rno, '')
-    >>> g2.join()
+    >>> g2.join(1.0)
 
     >>> gevent.sleep(.01)
     >>> shutdown_greenlet.ready()

Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-04-26 08:49:00 UTC (rev 125304)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-04-26 18:10:27 UTC (rev 125305)
@@ -98,10 +98,10 @@
         test, setattr, zc.resumelb.worker, 'STRING_BUFFER_SIZE', old)
     zc.resumelb.worker.STRING_BUFFER_SIZE = 9999
 
-    old = zc.resumelb.util.Worker.write_queue_size
+    old = zc.resumelb.util.queue_size_bytes
     zope.testing.setupstack.register(
-        test, setattr, zc.resumelb.util.Worker, 'write_queue_size', old)
-    zc.resumelb.util.Worker.write_queue_size = 999
+        test, setattr, zc.resumelb.util, 'queue_size_bytes', old)
+    zc.resumelb.util.queue_size_bytes = 999
 
 def zkSetUp(test):
     setUp(test)
@@ -121,6 +121,7 @@
                     ])
                 ) + manuel.capture.Manuel(),
             'lb.test', 'pool.test', 'worker.test', 'bytesizedqueue.test',
+            'bufferedqueue.test',
             setUp=setUp, tearDown=zope.testing.setupstack.tearDown),
         manuel.testing.TestSuite(
             manuel.doctest.Manuel(

Modified: zc.resumelb/trunk/src/zc/resumelb/util.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/util.py	2012-04-26 08:49:00 UTC (rev 125304)
+++ zc.resumelb/trunk/src/zc/resumelb/util.py	2012-04-26 18:10:27 UTC (rev 125305)
@@ -4,6 +4,7 @@
 import logging
 import marshal
 import socket
+import tempfile
 
 logger = logging.getLogger(__name__)
 
@@ -70,25 +71,121 @@
             multiplexer.disconnected()
             return
 
+
+queue_size_bytes = 99999
+
 class ByteSizedQueue(gevent.queue.Queue):
 
     __size = 0
 
     def _get(self):
         item = super(ByteSizedQueue, self)._get()
-        self.__size -= len(item)
+        if item:
+            self.__size -= len(item)
         return item
 
     def _put(self, item):
         super(ByteSizedQueue, self)._put(item)
-        self.__size += len(item)
+        if item:
+            self.__size += len(item)
 
     def qsize(self):
-        return self.__size
+        return self.__size or super(ByteSizedQueue, self).qsize()
 
+class BufferedQueue:
+
+    buffer = None
+
+    def __init__(self):
+        self.queue = ByteSizedQueue(queue_size_bytes)
+        self._put = self.queue.put
+        self.get = self.queue.get
+
+    def put(self, data):
+        try:
+            self._put(data, False)
+        except gevent.queue.Full:
+            self.queue = queue = Buffer(self.queue)
+            self._put = queue.put
+            self.close = queue.close
+            queue.put(data)
+
+    def qsize(self):
+        return self.queue.qsize()
+
+class Buffer:
+
+    size = size_bytes = read_position = write_position = 0
+
+    def __init__(self, queue):
+        self.queue = queue
+        self.file = tempfile.TemporaryFile(suffix='.rlbob')
+
+    def qsize(self):
+        return self.queue.qsize() + self.size_bytes
+
+    def close(self):
+        # Close the queue.  There are 2 possibilities:
+
+        # 1. The file buffer is non-empty and there's a greenlet
+        #    emptying it.  (See the feed greenlet in the put method.)
+        #    The greenlet is blocked puting data in the underlying
+        #    queue.  We can set size to -1, marking us as closed and
+        #    close the file. The greenlet will check sise before
+        #    trying trying to read the file again.
+
+        # 2. The file bugger is empty and there's no running greenlet.
+        #    We can set the size to -1 and close the file.
+
+        # In either case, we'll empty the underying queue, both for
+        # cleanliness and to unblock a greenlet, if there is one, so
+        # it can die a normal death,
+
+        if self.size < 0:
+            return # already closed
+
+        self.size = -1
+        self.file.close()
+
+        queue = self.queue
+        while queue.qsize():
+            queue.get()
+        self.size_bytes = 0
+
+    def put(self, data, block=False):
+        if self.size < 0:
+            return # closed
+
+        file = self.file
+        file.seek(self.write_position)
+        marshal.dump(data, file)
+        self.write_position = file.tell()
+        if data:
+            self.size_bytes += len(data)
+        self.size += 1
+        if self.size == 1:
+
+            @gevent.spawn
+            def feed():
+                queue = self.queue
+                while self.size > 0:
+                    file.seek(self.read_position)
+                    data = marshal.load(file)
+                    self.read_position = file.tell()
+                    queue.put(data)
+                    if self.size > 0:
+                        # We check the size here, in case the queue was closed
+                        if data:
+                            self.size_bytes -= len(data)
+                        self.size -= 1
+                    else:
+                        assert size == -1
+
+
+
 class Worker:
 
-    write_queue_size = 99999
+    ReadQueue = gevent.queue.Queue
 
     def connected(self, socket, addr=None):
         if addr is None:
@@ -96,7 +193,7 @@
         logger.info('worker connected %s', addr)
         self.addr = addr
         self.readers = {}
-        writeq = ByteSizedQueue(self.write_queue_size)
+        writeq = ByteSizedQueue(queue_size_bytes)
         gevent.Greenlet.spawn(writer, writeq, socket, self)
         self.put = writeq.put
         self.is_connected = True
@@ -106,15 +203,17 @@
         return len(self.readers)
 
     def start(self, rno):
-        readq = gevent.queue.Queue()
+        readq = self.ReadQueue()
         self.readers[rno] = readq.put
         return readq.get
 
     def end(self, rno):
         try:
-            del self.readers[rno]
+            queue = self.readers.pop(rno)
         except KeyError:
-            pass # previously cancelled
+            return # previously cancelled
+        if hasattr(queue, 'close'):
+            queue.close()
 
     def put_disconnected(self, *a, **k):
         raise Disconnected()



More information about the checkins mailing list