[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Changed output queues to be bytes-, rather than message-limited.
jim
cvs-admin at zope.org
Wed Apr 25 18:59:41 UTC 2012
Log message for revision 125293:
Changed output queues to be bytes-, rather than message-limited.
Changed:
A zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test
U zc.resumelb/trunk/src/zc/resumelb/tests.py
U zc.resumelb/trunk/src/zc/resumelb/util.py
U zc.resumelb/trunk/src/zc/resumelb/worker.test
-=-
Added: zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test (rev 0)
+++ zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test 2012-04-25 18:59:38 UTC (rev 125293)
@@ -0,0 +1,64 @@
+byte-sized queues
+=================
+
+Internally, we have greenlets that handle communication over the
+multiplexed lb<->worker socket and pass data to and from request
+handlers::
+
+ - read queue <-- read greenlet <-
+ / \
+ handler <- -----------> socket
+ \ /
+ -> write queue -> write greenled
+
+Read queues are per worker. The write queue is shared. The read
+greenlet reads messages from the socket. Each message has a request
+number, which is used to look up a handler read queue.
+
+We limit write queue size to prevent storing lots of data in memory.
+When no more data can be written to the socket, we want to block
+handlers from adding more data to the queue.
+
+It's important though to limit by bytes, rather than by message count.
+We use ByteSizedQueue for this:
+
+ >>> import zc.resumelb.util
+ >>> q = zc.resumelb.util.ByteSizedQueue(999999)
+
+ >>> q.put('a'*900000, False)
+ >>> q.put('b'*99990, False)
+ >>> q.put('c'*99990, False)
+ >>> q.put('d'*99990, False)
+ Traceback (most recent call last):
+ ...
+ Full
+
+ >>> q.qsize()
+ 1099980
+
+Note that the queue size is actually greater than the max size. We
+don't raise full unless the size is greater than or equal to the max
+size before adding an item.
+
+ >>> q.get() == 'a'*900000
+ True
+ >>> q.put('d'*99990, False)
+ >>> q.put('a'*900000, False)
+ >>> q.put('x'*900000, False)
+ Traceback (most recent call last):
+ ...
+ Full
+
+ >>> q.get() == 'b'*99990
+ True
+ >>> q.get() == 'c'*99990
+ True
+ >>> q.get() == 'd'*99990
+ True
+ >>> q.qsize()
+ 900000
+
+
+
+
+
Property changes on: zc.resumelb/trunk/src/zc/resumelb/bytesizedqueue.test
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py 2012-04-25 18:57:52 UTC (rev 125292)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py 2012-04-25 18:59:38 UTC (rev 125293)
@@ -24,6 +24,7 @@
import time
import unittest
import webob
+import zc.resumelb.util
import zc.resumelb.worker
import zc.zk.testing
import zope.testing.setupstack
@@ -92,12 +93,16 @@
global pid
pid = 6115
test.globs['wait'] = zope.testing.wait.Wait(getsleep=lambda : gevent.sleep)
- old_STRING_BUFFER_SIZE = zc.resumelb.worker.STRING_BUFFER_SIZE
+ old = zc.resumelb.worker.STRING_BUFFER_SIZE
zope.testing.setupstack.register(
- test, setattr, zc.resumelb.worker,
- 'STRING_BUFFER_SIZE', old_STRING_BUFFER_SIZE)
+ test, setattr, zc.resumelb.worker, 'STRING_BUFFER_SIZE', old)
zc.resumelb.worker.STRING_BUFFER_SIZE = 9999
+ old = zc.resumelb.util.Worker.write_queue_size
+ zope.testing.setupstack.register(
+ test, setattr, zc.resumelb.util.Worker, 'write_queue_size', old)
+ zc.resumelb.util.Worker.write_queue_size = 999
+
def zkSetUp(test):
setUp(test)
zc.zk.testing.setUp(test)
@@ -115,7 +120,7 @@
(re.compile(r'127.0.0.1:\d+'), '127.0.0.1:0'),
])
) + manuel.capture.Manuel(),
- 'lb.test', 'pool.test', 'worker.test',
+ 'lb.test', 'pool.test', 'worker.test', 'bytesizedqueue.test',
setUp=setUp, tearDown=zope.testing.setupstack.tearDown),
manuel.testing.TestSuite(
manuel.doctest.Manuel(
Modified: zc.resumelb/trunk/src/zc/resumelb/util.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/util.py 2012-04-25 18:57:52 UTC (rev 125292)
+++ zc.resumelb/trunk/src/zc/resumelb/util.py 2012-04-25 18:59:38 UTC (rev 125293)
@@ -70,15 +70,33 @@
multiplexer.disconnected()
return
+class ByteSizedQueue(gevent.queue.Queue):
+
+ __size = 0
+
+ def _get(self):
+ item = super(ByteSizedQueue, self)._get()
+ self.__size -= len(item)
+ return item
+
+ def _put(self, item):
+ super(ByteSizedQueue, self)._put(item)
+ self.__size += len(item)
+
+ def qsize(self):
+ return self.__size
+
class Worker:
+ write_queue_size = 99999
+
def connected(self, socket, addr=None):
if addr is None:
addr = socket.getpeername()
logger.info('worker connected %s', addr)
self.addr = addr
self.readers = {}
- writeq = gevent.queue.Queue(9)
+ writeq = ByteSizedQueue(self.write_queue_size)
gevent.Greenlet.spawn(writer, writeq, socket, self)
self.put = writeq.put
self.is_connected = True
Modified: zc.resumelb/trunk/src/zc/resumelb/worker.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/worker.test 2012-04-25 18:57:52 UTC (rev 125292)
+++ zc.resumelb/trunk/src/zc/resumelb/worker.test 2012-04-25 18:59:38 UTC (rev 125293)
@@ -524,7 +524,7 @@
Or requesting tons of data, but before receiving all of it:
- >>> env = newenv('test', '/gen.html?size=1000')
+ >>> env = newenv('test', '/gen.html?size=10000')
>>> write_message(worker_socket, 7, env, '')
>>> gevent.sleep(.01)
>>> write_message(worker_socket, 7, env, None)
@@ -540,7 +540,7 @@
>>> env = newenv('test', '/hi.html')
>>> write_message(worker_socket, 8, env, '')
-Now, we'll read data until we get a response record for request 7:
+Now, we'll read data until we get a response record for request 8:
>>> rno = 0
>>> while rno != 8:
More information about the checkins
mailing list