[Checkins] SVN: zc.ngi/trunk/async_txt.py Added code from
tests.test_async_cannot_connect
Jim Fulton
jim at zope.com
Mon Oct 2 07:10:38 EDT 2006
Log message for revision 70467:
Added code from tests.test_async_cannot_connect
Changed:
U zc.ngi/trunk/async_txt.py
-=-
Modified: zc.ngi/trunk/async_txt.py
===================================================================
--- zc.ngi/trunk/async_txt.py 2006-10-01 20:55:48 UTC (rev 70466)
+++ zc.ngi/trunk/async_txt.py 2006-10-02 11:10:37 UTC (rev 70467)
@@ -16,6 +16,7 @@
import zc.ngi.wordcount
import zc.ngi.async
+import zc.ngi.testing
port = zc.ngi.wordcount.start_server_process(zc.ngi.async.listener)
## We passed the module and name of the listener to be used.
@@ -34,3 +35,30 @@
_ = [thread.join() for thread in threads]
zc.ngi.wordcount.stop_server_process(zc.ngi.async.connector, addr)
+
+## Let's make sure that the connector handles connection failures correctly
+
+import threading
+lock = threading.Lock()
+_ = lock.acquire()
+
+## We define a simple handler that just notifies of failed connectioons.
+
+class Handler:
+ def failed_connect(connection, reason):
+ print 'failed', reason
+ lock.release()
+
+def connect(addr):
+ zc.ngi.async.connector(addr, Handler())
+ lock.acquire()
+
+## We find an unused port (so when we connect to it, the connection
+## will fail).
+
+port = zc.ngi.testing.get_port()
+
+## Now let's try to connect
+
+connect(('localhost', port))
+## failed connection failed
More information about the Checkins
mailing list