[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Changed output queues to be bytes-, rather than message-limited.

jim cvs-admin at zope.org
Wed Apr 25 18:59:41 UTC 2012


Log message for revision 125293:
  Changed output queues to be bytes-, rather than message-limited.
  

Changed:
  A   zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test
  U   zc.resumelb/trunk/src/zc/resumelb/tests.py
  U   zc.resumelb/trunk/src/zc/resumelb/util.py
  U   zc.resumelb/trunk/src/zc/resumelb/worker.test

-=-
Added: zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test	                        (rev 0)
+++ zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test	2012-04-25 18:59:38 UTC (rev 125293)
@@ -0,0 +1,64 @@
+byte-sized queues
+=================
+
+Internally, we have greenlets that handle communication over the
+multiplexed lb<->worker socket and pass data to and from request
+handlers::
+
+              - read queue <-- read greenlet <-
+             /                                  \
+   handler <-                   -----------> socket
+      \                       /
+        -> write queue -> write greenled
+
+Read queues are per worker.  The write queue is shared.  The read
+greenlet reads messages from the socket. Each message has a request
+number, which is used to look up a handler read queue.
+
+We limit write queue size to prevent storing lots of data in memory.
+When no more data can be written to the socket, we want to block
+handlers from adding more data to the queue.
+
+It's important though to limit by bytes, rather than by message count.
+We use ByteSizedQueue for this:
+
+    >>> import zc.resumelb.util
+    >>> q = zc.resumelb.util.ByteSizedQueue(999999)
+
+    >>> q.put('a'*900000, False)
+    >>> q.put('b'*99990, False)
+    >>> q.put('c'*99990, False)
+    >>> q.put('d'*99990, False)
+    Traceback (most recent call last):
+    ...
+    Full
+
+    >>> q.qsize()
+    1099980
+
+Note that the queue size is actually greater than the max size.  We
+don't raise full unless the size is greater than or equal to the max
+size before adding an item.
+
+    >>> q.get() == 'a'*900000
+    True
+    >>> q.put('d'*99990, False)
+    >>> q.put('a'*900000, False)
+    >>> q.put('x'*900000, False)
+    Traceback (most recent call last):
+    ...
+    Full
+
+    >>> q.get() == 'b'*99990
+    True
+    >>> q.get() == 'c'*99990
+    True
+    >>> q.get() == 'd'*99990
+    True
+    >>> q.qsize()
+    900000
+
+
+
+
+


Property changes on: zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-04-25 18:57:52 UTC (rev 125292)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py	2012-04-25 18:59:38 UTC (rev 125293)
@@ -24,6 +24,7 @@
 import time
 import unittest
 import webob
+import zc.resumelb.util
 import zc.resumelb.worker
 import zc.zk.testing
 import zope.testing.setupstack
@@ -92,12 +93,16 @@
     global pid
     pid = 6115
     test.globs['wait'] = zope.testing.wait.Wait(getsleep=lambda : gevent.sleep)
-    old_STRING_BUFFER_SIZE = zc.resumelb.worker.STRING_BUFFER_SIZE
+    old = zc.resumelb.worker.STRING_BUFFER_SIZE
     zope.testing.setupstack.register(
-        test, setattr, zc.resumelb.worker,
-        'STRING_BUFFER_SIZE', old_STRING_BUFFER_SIZE)
+        test, setattr, zc.resumelb.worker, 'STRING_BUFFER_SIZE', old)
     zc.resumelb.worker.STRING_BUFFER_SIZE = 9999
 
+    old = zc.resumelb.util.Worker.write_queue_size
+    zope.testing.setupstack.register(
+        test, setattr, zc.resumelb.util.Worker, 'write_queue_size', old)
+    zc.resumelb.util.Worker.write_queue_size = 999
+
 def zkSetUp(test):
     setUp(test)
     zc.zk.testing.setUp(test)
@@ -115,7 +120,7 @@
                     (re.compile(r'127.0.0.1:\d+'), '127.0.0.1:0'),
                     ])
                 ) + manuel.capture.Manuel(),
-            'lb.test', 'pool.test', 'worker.test',
+            'lb.test', 'pool.test', 'worker.test', 'bytesizedqueue.test',
             setUp=setUp, tearDown=zope.testing.setupstack.tearDown),
         manuel.testing.TestSuite(
             manuel.doctest.Manuel(

Modified: zc.resumelb/trunk/src/zc/resumelb/util.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/util.py	2012-04-25 18:57:52 UTC (rev 125292)
+++ zc.resumelb/trunk/src/zc/resumelb/util.py	2012-04-25 18:59:38 UTC (rev 125293)
@@ -70,15 +70,33 @@
             multiplexer.disconnected()
             return
 
+class ByteSizedQueue(gevent.queue.Queue):
+
+    __size = 0
+
+    def _get(self):
+        item = super(ByteSizedQueue, self)._get()
+        self.__size -= len(item)
+        return item
+
+    def _put(self, item):
+        super(ByteSizedQueue, self)._put(item)
+        self.__size += len(item)
+
+    def qsize(self):
+        return self.__size
+
 class Worker:
 
+    write_queue_size = 99999
+
     def connected(self, socket, addr=None):
         if addr is None:
             addr = socket.getpeername()
         logger.info('worker connected %s', addr)
         self.addr = addr
         self.readers = {}
-        writeq = gevent.queue.Queue(9)
+        writeq = ByteSizedQueue(self.write_queue_size)
         gevent.Greenlet.spawn(writer, writeq, socket, self)
         self.put = writeq.put
         self.is_connected = True

Modified: zc.resumelb/trunk/src/zc/resumelb/worker.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/worker.test	2012-04-25 18:57:52 UTC (rev 125292)
+++ zc.resumelb/trunk/src/zc/resumelb/worker.test	2012-04-25 18:59:38 UTC (rev 125293)
@@ -524,7 +524,7 @@
 
 Or requesting tons of data, but before receiving all of it:
 
-    >>> env = newenv('test', '/gen.html?size=1000')
+    >>> env = newenv('test', '/gen.html?size=10000')
     >>> write_message(worker_socket, 7, env, '')
     >>> gevent.sleep(.01)
     >>> write_message(worker_socket, 7, env, None)
@@ -540,7 +540,7 @@
     >>> env = newenv('test', '/hi.html')
     >>> write_message(worker_socket, 8, env, '')
 
-Now, we'll read data until we get a response record for request 7:
+Now, we'll read data until we get a response record for request 8:
 
     >>> rno = 0
     >>> while rno != 8:



More information about the checkins mailing list