[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/ Added worker tests for request cancellation.

Jim Fulton jim at zope.com
Sat Feb 4 22:28:13 UTC 2012


Log message for revision 124305:
  Added worker tests for request cancellation.
  
  Added logic to detect cancellation after getting all request data.
  

Changed:
  U   Sandbox/J1m/resumelb/src/zc/resumelb/util.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.test

-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/util.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/util.py	2012-02-03 22:22:02 UTC (rev 124304)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/util.py	2012-02-04 22:28:12 UTC (rev 124305)
@@ -70,7 +70,7 @@
         logger.info('worker connected %s', addr)
         self.addr = addr
         self.readers = {}
-        writeq = gevent.queue.Queue()
+        writeq = gevent.queue.Queue(9)
         gevent.Greenlet.spawn(writer, writeq, socket, self)
         self.put = writeq.put
         self.is_connected = True
@@ -85,7 +85,10 @@
         return readq.get
 
     def end(self, rno):
-        del self.readers[rno]
+        try:
+            del self.readers[rno]
+        except KeyError:
+            pass # previously cancelled
 
     def put_disconnected(self, *a, **k):
         raise Disconnected()

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-02-03 22:22:02 UTC (rev 124304)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-02-04 22:28:12 UTC (rev 124305)
@@ -69,12 +69,16 @@
 
                 rput = readers.get(rno)
                 if rput is None:
-                    env = data
-                    env['zc.resumelb.time'] = time.time()
-                    env['zc.resumelb.lb_addr'] = addr
-                    gevent.spawn(self.handle, conn, rno, conn.start(rno), env)
+                    if data:
+                        env = data
+                        env['zc.resumelb.time'] = time.time()
+                        env['zc.resumelb.lb_addr'] = addr
+                        gevent.spawn(
+                            self.handle, conn, rno, conn.start(rno), env)
                 else:
                     rput(data)
+                    if data is None:
+                        del readers[rno]
         except:
             error('handle_connection')
 
@@ -102,9 +106,14 @@
                 response[0] = (status, headers)
 
             try:
+                requests = conn.readers
                 body = self.apply(self.app, (env, start_response))
+                if rno not in requests:
+                    return # cancelled
                 conn.put((rno, response[0]))
                 for data in body:
+                    if rno not in requests:
+                        return # cancelled
                     if data:
                         conn.put((rno, data))
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.test	2012-02-03 22:22:02 UTC (rev 124304)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.test	2012-02-04 22:28:12 UTC (rev 124305)
@@ -417,4 +417,76 @@
     <BLANKLINE>
     hello world
 
+
+Cancellation
+------------
+
+The load balancer can cancel a request at various stages.
+
+If we send a cancelation after a request has been finalized, the
+cancellation is ignored:
+
+    >>> write_message(worker_socket, 3, None)
+    >>> gevent.sleep(.01)
+
+We can cancel a request before sending input data:
+
+    >>> env = newenv('test', '/hi.html', method='POST', body='xxx')
+    >>> write_message(worker_socket, 4, env)
+    >>> gevent.sleep(.01)
+    >>> write_message(worker_socket, 4, None)
+
+Or after sending some data:
+
+    >>> write_message(worker_socket, 5, env, 'x')
+    >>> gevent.sleep(.01)
+    >>> write_message(worker_socket, 5, None)
+
+Or after sending a complete request, but so that the worker will see
+cancellation before rending a response, because the app takes some time:
+
+    >>> env = newenv('test', '/gsleep.html?dur=.1')
+    >>> write_message(worker_socket, 6, env, '', None)
+
+Or requesting tons of data, but before recieving all of it:
+
+    >>> env = newenv('test', '/gen.html?size=1000')
+    >>> write_message(worker_socket, 7, env, '')
+    >>> gevent.sleep(.01)
+    >>> write_message(worker_socket, 7, env, None)
+
+OK, so we sent 3 requests and cancelled all of them.  We shouldn't see
+any output for 4 and 5, as they were cancelled completed before they
+were completed.  Request 6 will be performed. Some output will be
+probably be written to the socket before the cancellation is seen, but
+not all of it.
+
+We'll send another request to act as a marker in the output we get
+back:
+
+    >>> env = newenv('test', '/hi.html')
+    >>> write_message(worker_socket, 8, env, '')
+
+Now, we'll read data until we get a response record for request 7:
+
+    >>> rno = 0
+    >>> while rno != 8:
+    ...     rno, data = read_message(worker_socket)
+    ...     print rno, repr(data)[:60]
+    ...     # doctest: +ELLIPSIS
+    7 ('200 OK', [('Content-Type', 'text/html; charset=UTF-8'), ('
+    ...
+    7 'hello world\nhello world\nhello world\nhello world\nhello w
+    8 ('200 OK', [('Content-Type', 'text/html; charset=UTF-8; char
+
+Note that we didn't get output for requests 4, 5 or 6, because they
+were cancelled before they were started or before they started sending
+a response. We got some, but not all of the data for request 7.  We
+know we didn't get it all because we didn't get the end of-of-request
+marker.
+
+cleanup
+--------------------------------------------------------------------
+
+
     >>> worker.stop()



More information about the checkins mailing list