[Checkins] SVN: zc.ngi/trunk/src/zc/ngi/ Fixed a bug in handling connections. I/O was done in the thread

Jim Fulton jim at zope.com
Tue Sep 26 12:16:30 EDT 2006


Log message for revision 70390:
  Fixed a bug in handling connections.  I/O was done in the thread
  initiating the connection.  This is a no no. Only the main-loop thread
  should do I/O.  This, and differences in the way select for connecting
  sockets work on Linux (relative to Mac OS X) caused connections to
  fail in some cases.  Ultimately, this was a threading bug as the code
  expects all I/O and handler calls to be done from the loop thread.
  
  Added exception handlers to prevent broken application handlers from
  bringing down the loop thread.
  

Changed:
  U   zc.ngi/trunk/src/zc/ngi/async.py
  U   zc.ngi/trunk/src/zc/ngi/tests.py

-=-
Modified: zc.ngi/trunk/src/zc/ngi/async.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/async.py	2006-09-26 16:16:27 UTC (rev 70389)
+++ zc.ngi/trunk/src/zc/ngi/async.py	2006-09-26 16:16:29 UTC (rev 70390)
@@ -31,6 +31,7 @@
 pid = os.getpid()
 
 _map = {}
+_connectors = {}
 
 expected_socket_read_errors = {
     errno.EWOULDBLOCK: 0,
@@ -47,14 +48,18 @@
 
 class dispatcher(asyncore.dispatcher):
 
-    def __init__(self, sock, addr):
+    def __init__(self, sock, addr, map=_map):
         self.addr = addr
-        asyncore.dispatcher.__init__(self, sock, _map)
+        asyncore.dispatcher.__init__(self, sock, map)
 
     def handle_error(self):
         reason = sys.exc_info()[1]
         self.logger.exception('handle_error')
-        self.handle_close(reason)
+        try:
+            self.handle_close(reason)
+        except:
+            self.logger.exception("Exception raised by handle_close(%r)",
+                                  reason)
         self.close()
 
     def close(self):
@@ -199,17 +204,20 @@
         else:
             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
+        dispatcher.__init__(self, sock, addr, _connectors)
+
+        notify_select()
+
+    def connect(self):
         if __debug__:
-            self.logger.debug('connecting to %s', addr)
-        dispatcher.__init__(self, sock, addr)
+            self.logger.debug('connecting to %s', self.addr)
+
+        self.add_channel(_map)
         try:
-            if self.handle_write_event():
-                return
+            self.handle_write_event()
         except:
             self.handle_error()
 
-        notify_select()
-
     def readable(self):
         return False
 
@@ -236,14 +244,20 @@
         self.del_channel(_map)
         if __debug__:
             self.logger.debug('outgoing connected %r', self.addr)
-        self.__handler.connected(
-            _Connection(self.socket, self.addr, self.logger))
+
+        connection = _Connection(self.socket, self.addr, self.logger)
+        self.__handler.connected(connection)
         return
 
     def handle_error(self):
         reason = sys.exc_info()[1]
         self.logger.exception('connect error')
-        self.__handler.failed_connect(reason)
+        try:
+            self.__handler.failed_connect(reason)
+        except:
+            self.logger.exception(
+                "Handler failed_connect(%s) raised an exception", reason,
+                )
         self.close()
 
     def handle_expt(self):
@@ -300,7 +314,7 @@
         self.del_channel(_map)
         self.socket.close()
         if handler is None:
-            for c in list(self._connections):
+            for c in list(self.__connections):
                 c.handle_close("stopped")
         elif not self.__connections:
             handler(self)
@@ -438,6 +452,7 @@
 def loop():
     timeout = 30.0
     map = _map
+    connectors = _connectors
     if hasattr(select, 'poll'):
         poll_fun = asyncore.poll3
     else:
@@ -446,6 +461,10 @@
     logger = logging.getLogger('zc.ngi.async.loop')
 
     while map:
+        for f in list(connectors):
+            c = connectors.pop(f)
+            c.connect()
+            
         try:
             poll_fun(timeout, map)
         except:

Modified: zc.ngi/trunk/src/zc/ngi/tests.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/tests.py	2006-09-26 16:16:27 UTC (rev 70389)
+++ zc.ngi/trunk/src/zc/ngi/tests.py	2006-09-26 16:16:29 UTC (rev 70390)
@@ -15,10 +15,11 @@
 
 $Id$
 """
-import unittest
+import threading, unittest
 from zope.testing import doctest
 import zc.ngi.testing
-import zc.ngi.async # start async thread before tests run
+import zc.ngi.async
+import zc.ngi.wordcount
 
 def test_async_cannot_connect():
     """Let's make sure that the connector handles connection failures correctly
@@ -50,14 +51,68 @@
     
     """
 
+class BrokenConnect:
+
+    connected = failed_connect = __call__ = lambda: xxxxx
+    
+class BrokenAfterConnect:
+
+    def connected(self, connection):
+        connection.write("Hee hee\0")
+        connection.setHandler(self)
+
+    __call__ = connected
+
+    handle_input = handle_close = lambda: xxxxx
+
+def async_evil_setup(test):
+
+    # Uncomment the next 2 lines to check that a bunch of lambda type
+    # errors are logged.
+    #import logging
+    #logging.getLogger().addHandler(logging.StreamHandler())
+    
+    # See if we can break the main loop before running the async test
+    
+    # Connect to bad port with bad handler
+
+    port = zc.ngi.wordcount.get_port()
+    addr = 'localhost', port
+    zc.ngi.async.connector(addr, BrokenConnect())
+
+    # Start the server and connect to a good port with a bad handler
+
+    port = zc.ngi.wordcount.start_server_process(zc.ngi.async.listener)
+    addr = 'localhost', port
+    zc.ngi.async.connector(addr, BrokenAfterConnect())
+
+    # Stop the server
+    zc.ngi.wordcount.stop_server_process(zc.ngi.async.connector, addr)
+
+    # Create a lister with a broken server and connect to it
+    port = zc.ngi.wordcount.get_port()
+    addr = 'localhost', port
+    zc.ngi.async.listener(addr, BrokenConnect())
+    zc.ngi.async.connector(addr, BrokenAfterConnect())
+
+    # Create a lister with a broken Server handler and connect to it
+    port = zc.ngi.wordcount.get_port()
+    addr = 'localhost', port
+    zc.ngi.async.listener(addr, BrokenAfterConnect())
+    zc.ngi.async.connector(addr, BrokenAfterConnect())
+
+    
 def test_suite():
     return unittest.TestSuite([
         doctest.DocFileSuite(
             'README.txt',
             'message.txt',
-            'async.txt',
             'adapters.txt',
             ),
+        doctest.DocFileSuite(
+            'async.txt',
+            setUp=async_evil_setup,
+            ),
         doctest.DocTestSuite(),
         ])
 



More information about the Checkins mailing list