[Checkins] SVN: zc.ngi/trunk/src/zc/ngi/ Fixed a bug in handling
connections. I/O was done in the thread
Jim Fulton
jim at zope.com
Tue Sep 26 12:16:30 EDT 2006
Log message for revision 70390:
Fixed a bug in handling connections. I/O was done in the thread
initiating the connection. This is a no no. Only the main-loop thread
should do I/O. This, and differences in the way select for connecting
sockets work on Linux (relative to Mac OS X) caused connections to
fail in some cases. Ultimately, this was a threading bug as the code
expects all I/O and handler calls to be done from the loop thread.
Added exception handlers to prevent broken application handlers from
bringing down the loop thread.
Changed:
U zc.ngi/trunk/src/zc/ngi/async.py
U zc.ngi/trunk/src/zc/ngi/tests.py
-=-
Modified: zc.ngi/trunk/src/zc/ngi/async.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/async.py 2006-09-26 16:16:27 UTC (rev 70389)
+++ zc.ngi/trunk/src/zc/ngi/async.py 2006-09-26 16:16:29 UTC (rev 70390)
@@ -31,6 +31,7 @@
pid = os.getpid()
_map = {}
+_connectors = {}
expected_socket_read_errors = {
errno.EWOULDBLOCK: 0,
@@ -47,14 +48,18 @@
class dispatcher(asyncore.dispatcher):
- def __init__(self, sock, addr):
+ def __init__(self, sock, addr, map=_map):
self.addr = addr
- asyncore.dispatcher.__init__(self, sock, _map)
+ asyncore.dispatcher.__init__(self, sock, map)
def handle_error(self):
reason = sys.exc_info()[1]
self.logger.exception('handle_error')
- self.handle_close(reason)
+ try:
+ self.handle_close(reason)
+ except:
+ self.logger.exception("Exception raised by handle_close(%r)",
+ reason)
self.close()
def close(self):
@@ -199,17 +204,20 @@
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ dispatcher.__init__(self, sock, addr, _connectors)
+
+ notify_select()
+
+ def connect(self):
if __debug__:
- self.logger.debug('connecting to %s', addr)
- dispatcher.__init__(self, sock, addr)
+ self.logger.debug('connecting to %s', self.addr)
+
+ self.add_channel(_map)
try:
- if self.handle_write_event():
- return
+ self.handle_write_event()
except:
self.handle_error()
- notify_select()
-
def readable(self):
return False
@@ -236,14 +244,20 @@
self.del_channel(_map)
if __debug__:
self.logger.debug('outgoing connected %r', self.addr)
- self.__handler.connected(
- _Connection(self.socket, self.addr, self.logger))
+
+ connection = _Connection(self.socket, self.addr, self.logger)
+ self.__handler.connected(connection)
return
def handle_error(self):
reason = sys.exc_info()[1]
self.logger.exception('connect error')
- self.__handler.failed_connect(reason)
+ try:
+ self.__handler.failed_connect(reason)
+ except:
+ self.logger.exception(
+ "Handler failed_connect(%s) raised an exception", reason,
+ )
self.close()
def handle_expt(self):
@@ -300,7 +314,7 @@
self.del_channel(_map)
self.socket.close()
if handler is None:
- for c in list(self._connections):
+ for c in list(self.__connections):
c.handle_close("stopped")
elif not self.__connections:
handler(self)
@@ -438,6 +452,7 @@
def loop():
timeout = 30.0
map = _map
+ connectors = _connectors
if hasattr(select, 'poll'):
poll_fun = asyncore.poll3
else:
@@ -446,6 +461,10 @@
logger = logging.getLogger('zc.ngi.async.loop')
while map:
+ for f in list(connectors):
+ c = connectors.pop(f)
+ c.connect()
+
try:
poll_fun(timeout, map)
except:
Modified: zc.ngi/trunk/src/zc/ngi/tests.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/tests.py 2006-09-26 16:16:27 UTC (rev 70389)
+++ zc.ngi/trunk/src/zc/ngi/tests.py 2006-09-26 16:16:29 UTC (rev 70390)
@@ -15,10 +15,11 @@
$Id$
"""
-import unittest
+import threading, unittest
from zope.testing import doctest
import zc.ngi.testing
-import zc.ngi.async # start async thread before tests run
+import zc.ngi.async
+import zc.ngi.wordcount
def test_async_cannot_connect():
"""Let's make sure that the connector handles connection failures correctly
@@ -50,14 +51,68 @@
"""
+class BrokenConnect:
+
+ connected = failed_connect = __call__ = lambda: xxxxx
+
+class BrokenAfterConnect:
+
+ def connected(self, connection):
+ connection.write("Hee hee\0")
+ connection.setHandler(self)
+
+ __call__ = connected
+
+ handle_input = handle_close = lambda: xxxxx
+
+def async_evil_setup(test):
+
+ # Uncomment the next 2 lines to check that a bunch of lambda type
+ # errors are logged.
+ #import logging
+ #logging.getLogger().addHandler(logging.StreamHandler())
+
+ # See if we can break the main loop before running the async test
+
+ # Connect to bad port with bad handler
+
+ port = zc.ngi.wordcount.get_port()
+ addr = 'localhost', port
+ zc.ngi.async.connector(addr, BrokenConnect())
+
+ # Start the server and connect to a good port with a bad handler
+
+ port = zc.ngi.wordcount.start_server_process(zc.ngi.async.listener)
+ addr = 'localhost', port
+ zc.ngi.async.connector(addr, BrokenAfterConnect())
+
+ # Stop the server
+ zc.ngi.wordcount.stop_server_process(zc.ngi.async.connector, addr)
+
+ # Create a lister with a broken server and connect to it
+ port = zc.ngi.wordcount.get_port()
+ addr = 'localhost', port
+ zc.ngi.async.listener(addr, BrokenConnect())
+ zc.ngi.async.connector(addr, BrokenAfterConnect())
+
+ # Create a lister with a broken Server handler and connect to it
+ port = zc.ngi.wordcount.get_port()
+ addr = 'localhost', port
+ zc.ngi.async.listener(addr, BrokenAfterConnect())
+ zc.ngi.async.connector(addr, BrokenAfterConnect())
+
+
def test_suite():
return unittest.TestSuite([
doctest.DocFileSuite(
'README.txt',
'message.txt',
- 'async.txt',
'adapters.txt',
),
+ doctest.DocFileSuite(
+ 'async.txt',
+ setUp=async_evil_setup,
+ ),
doctest.DocTestSuite(),
])
More information about the Checkins
mailing list