[Zope3-checkins] CVS: Zope3/src/zope/fssync/tests - test_network.py:1.5

Guido van Rossum guido@python.org
Wed, 28 May 2003 13:52:44 -0400


Update of /cvs-repository/Zope3/src/zope/fssync/tests
In directory cvs.zope.org:/tmp/cvs-serv30913

Modified Files:
	test_network.py 
Log Message:
OK, I caved.  test_httpreq() was taking several seconds, doing DNS
lookup of python.org and then connecting to it.  So now there's a
dummy server that's accessed via localhost (127.0.0.1), and it takes
around 0.7 seconds.


=== Zope3/src/zope/fssync/tests/test_network.py 1.4 => 1.5 ===
--- Zope3/src/zope/fssync/tests/test_network.py:1.4	Wed May 28 10:40:04 2003
+++ Zope3/src/zope/fssync/tests/test_network.py	Wed May 28 13:52:44 2003
@@ -17,7 +17,10 @@
 """
 
 import os
+import select
+import socket
 import unittest
+import threading
 
 from StringIO import StringIO
 
@@ -28,6 +31,59 @@
 
 sample_rooturl = "http://user:passwd@host:8080/path"
 
+HOST = "127.0.0.1"     # localhost
+PORT = 60841           # random number
+RESPONSE = """HTTP/1.0 404 Not found\r
+Content-type: text/plain\r
+Content-length: 0\r
+\r
+"""
+
+class DummyServer(threading.Thread):
+
+    """A server that can handle one HTTP request (returning a 404 error)."""
+
+    stopping = False
+
+    def run(self):
+        svr = socket.socket()
+        svr.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        svr.bind((HOST, PORT))
+        svr.listen(1)
+        conn = None
+        sent_response = False
+        while not self.stopping:
+            if conn is None:
+                r = [svr]
+            else:
+                r = [conn]
+            r, w, x = select.select(r, [], [], 0.01)
+            if not r:
+                continue
+            s = r[0]
+            if s is svr:
+                conn, addr = svr.accept()
+                ##print "connect from", `addr`
+            else:
+                assert s is conn
+                data = conn.recv(1000)
+                ##print "received", `data`
+                if not data:
+                    break
+                if not sent_response:
+                    conn.send(RESPONSE)
+                    conn.close()
+                    conn = None
+                    sent_response = True
+        if conn is not None:
+            conn.close()
+        svr.close()
+        ##print "stopped"
+        
+    def stop(self):
+        ##print "stopping"
+        self.stopping = True
+
 class TestNetwork(TempFiles):
 
     def setUp(self):
@@ -97,11 +153,13 @@
         self.assertEqual(new.rooturl, sample_rooturl)
 
     def test_httpreq(self):
-        # XXX I don't want to write up a dummy server just to test
-        # this so I'll just send a request to python.org that I know
-        # will fail.
-        self.network.setrooturl("http://python.org")
-        self.assertRaises(Error, self.network.httpreq, "/xyzzy", "@@view")
+        svr = DummyServer()
+        svr.start()
+        try:
+            self.network.setrooturl("http://%s:%s" % (HOST, PORT))
+            self.assertRaises(Error, self.network.httpreq, "/xyzzy", "@@view")
+        finally:
+            svr.stop()
 
     def test_slurptext_html(self):
         fp = StringIO("<p>This is some\n\ntext.</p>\n")