[Checkins] SVN: zc.ngi/trunk/async_txt.py Added code from tests.test_async_cannot_connect

Jim Fulton jim at zope.com
Mon Oct 2 07:10:38 EDT 2006


Log message for revision 70467:
  Added code from tests.test_async_cannot_connect
  

Changed:
  U   zc.ngi/trunk/async_txt.py

-=-
Modified: zc.ngi/trunk/async_txt.py
===================================================================
--- zc.ngi/trunk/async_txt.py	2006-10-01 20:55:48 UTC (rev 70466)
+++ zc.ngi/trunk/async_txt.py	2006-10-02 11:10:37 UTC (rev 70467)
@@ -16,6 +16,7 @@
 
 import zc.ngi.wordcount
 import zc.ngi.async
+import zc.ngi.testing
 port = zc.ngi.wordcount.start_server_process(zc.ngi.async.listener)
 
 ## We passed the module and name of the listener to be used.
@@ -34,3 +35,30 @@
 _ = [thread.join() for thread in threads]
 
 zc.ngi.wordcount.stop_server_process(zc.ngi.async.connector, addr)
+
+## Let's make sure that the connector handles connection failures correctly
+
+import threading
+lock = threading.Lock()
+_ = lock.acquire()
+
+##     We define a simple handler that just notifies of failed connectioons.
+
+class Handler:
+    def failed_connect(connection, reason):
+        print 'failed', reason
+        lock.release()
+
+def connect(addr):
+    zc.ngi.async.connector(addr, Handler())
+    lock.acquire()
+
+##     We find an unused port (so when we connect to it, the connection
+##     will fail).
+
+port = zc.ngi.testing.get_port()
+
+##     Now let's try to connect
+
+connect(('localhost', port))
+##     failed connection failed



More information about the Checkins mailing list