[Checkins] SVN: zope.server/trunk/src/zope/server/http/ Make zope.server's HTTP server *really* WSGI compliant by adding variables to the

Philipp von Weitershausen philikon at philikon.de
Sat Jun 2 14:26:38 EDT 2007


Log message for revision 76162:
  Make zope.server's HTTP server *really* WSGI compliant by adding variables to the
  environment that are required by the WSGI spec.
  

Changed:
  U   zope.server/trunk/src/zope/server/http/tests/test_wsgiserver.py
  U   zope.server/trunk/src/zope/server/http/wsgihttpserver.py

-=-
Modified: zope.server/trunk/src/zope/server/http/tests/test_wsgiserver.py
===================================================================
--- zope.server/trunk/src/zope/server/http/tests/test_wsgiserver.py	2007-06-02 18:19:23 UTC (rev 76161)
+++ zope.server/trunk/src/zope/server/http/tests/test_wsgiserver.py	2007-06-02 18:26:38 UTC (rev 76162)
@@ -88,6 +88,37 @@
             raise Conflict
 
 
+class WSGIInfo(object):
+    """Docstring required by publisher"""
+
+    def __call__(self, REQUEST):
+        """Return a list of variables beginning with 'wsgi.'"""
+        r = []
+        for name in REQUEST.keys():
+            if name.startswith('wsgi.'):
+                r.append(name)
+        return ' '.join(r)
+
+    def version(self, REQUEST):
+        """Return WSGI version"""
+        return str(REQUEST['wsgi.version'])
+
+    def url_scheme(self, REQUEST):
+        """Return WSGI URL scheme"""
+        return REQUEST['wsgi.url_scheme']
+
+    def multithread(self, REQUEST):
+        """Return WSGI multithreadedness"""
+        return str(bool(REQUEST['wsgi.multithread']))
+
+    def multiprocess(self, REQUEST):
+        """Return WSGI multiprocessedness"""
+        return str(bool(REQUEST['wsgi.multiprocess']))
+
+    def run_once(self, REQUEST):
+        """Return whether WSGI app is invoked only once or not"""
+        return str(bool(REQUEST['wsgi.run_once']))
+
 class Tests(PlacelessSetup, unittest.TestCase):
 
     def setUp(self):
@@ -97,8 +128,8 @@
         obj = tested_object()
         obj.folder = tested_object()
         obj.folder.item = tested_object()
-
         obj._protected = tested_object()
+        obj.wsgi = WSGIInfo()
 
         pub = PublicationWithConflict(obj)
 
@@ -132,8 +163,7 @@
         while self.run_loop:
             poll(0.1, socket_map)
 
-    def testResponse(self, path='/', status_expected=200,
-                     add_headers=None, request_body=''):
+    def invokeRequest(self, path='/', add_headers=None, request_body=''):
         h = HTTPConnection(LOCALHOST, self.port)
         h.putrequest('GET', path)
         h.putheader('Accept', 'text/plain')
@@ -152,43 +182,76 @@
         else:
             response_body = ''
 
-        # Please do not disable the status code check.  It must work.
-        self.failUnlessEqual(int(response.status), status_expected)
+        self.assertEqual(length, len(response_body))
 
-        self.failUnlessEqual(length, len(response_body))
+        return response.status, response_body
 
-        if (status_expected == 200):
-            if path == '/': path = ''
-            expect_response = 'URL invoked: http://%s:%d%s' % (LOCALHOST,
-                self.port, path)
-            self.failUnlessEqual(response_body, expect_response)
 
     def testDeeperPath(self):
-        self.testResponse(path='/folder/item')
+        status, response_body = self.invokeRequest('/folder/item')
+        self.assertEqual(status, 200)
+        expect_response = 'URL invoked: http://%s:%d/folder/item' % (
+            LOCALHOST, self.port)
+        self.assertEqual(response_body, expect_response)
 
     def testNotFound(self):
-        self.testResponse(path='/foo/bar', status_expected=404)
+        status, response_body = self.invokeRequest('/foo/bar')
+        self.assertEqual(status, 404)
 
     def testUnauthorized(self):
-        self.testResponse(path='/_protected', status_expected=401)
+        status, response_body = self.invokeRequest('/_protected')
+        self.assertEqual(status, 401)
 
     def testRedirectMethod(self):
-        self.testResponse(path='/redirect_method', status_expected=303)
+        status, response_body = self.invokeRequest('/redirect_method')
+        self.assertEqual(status, 303)
 
     def testRedirectException(self):
-        self.testResponse(path='/redirect_exception', status_expected=303)
-        self.testResponse(path='/folder/redirect_exception',
-                          status_expected=303)
+        status, response_body = self.invokeRequest('/redirect_exception')
+        self.assertEqual(status, 303)
+        status, response_body = self.invokeRequest('/folder/redirect_exception')
+        self.assertEqual(status, 303)
 
     def testConflictRetry(self):
+        status, response_body = self.invokeRequest('/conflict?wait_tries=2')
         # Expect the "Accepted" response since the retries will succeed.
-        self.testResponse(path='/conflict?wait_tries=2', status_expected=202)
+        self.assertEqual(status, 202)
 
     def testFailedConflictRetry(self):
+        status, response_body = self.invokeRequest('/conflict?wait_tries=10')
         # Expect a "Conflict" response since there will be too many
         # conflicts.
-        self.testResponse(path='/conflict?wait_tries=10', status_expected=409)
+        self.assertEqual(status, 409)
 
+    def testWSGIVariables(self):
+        # Assert that the environment contains all required WSGI variables
+        status, response_body = self.invokeRequest('/wsgi')
+        wsgi_variables = set(response_body.split())
+        self.assertEqual(wsgi_variables,
+                         set(['wsgi.version', 'wsgi.url_scheme', 'wsgi.input',
+                              'wsgi.errors', 'wsgi.multithread',
+                              'wsgi.multiprocess', 'wsgi.run_once']))
+
+    def testWSGIVersion(self):
+        status, response_body = self.invokeRequest('/wsgi/version')
+        self.assertEqual("(1, 0)", response_body)
+
+    def testWSGIURLScheme(self):
+        status, response_body = self.invokeRequest('/wsgi/url_scheme')
+        self.assertEqual('http', response_body)
+
+    def testWSGIMultithread(self):
+        status, response_body = self.invokeRequest('/wsgi/multithread')
+        self.assertEqual('True', response_body)
+
+    def testWSGIMultiprocess(self):
+        status, response_body = self.invokeRequest('/wsgi/multiprocess')
+        self.assertEqual('True', response_body)
+
+    def testWSGIRunOnce(self):
+        status, response_body = self.invokeRequest('/wsgi/run_once')
+        self.assertEqual('False', response_body)
+
     def test_server_uses_iterable(self):
         # Make sure that the task write method isn't called with a
         # str or non iterable
@@ -205,9 +268,9 @@
                 list(v)
         self.server.executeRequest(FakeTask())
 
+
 def test_suite():
-    loader = unittest.TestLoader()
-    return loader.loadTestsFromTestCase(Tests)
+    return unittest.TestSuite(unittest.makeSuite(Tests))
 
-if __name__=='__main__':
-    unittest.TextTestRunner().run(test_suite())
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')

Modified: zope.server/trunk/src/zope/server/http/wsgihttpserver.py
===================================================================
--- zope.server/trunk/src/zope/server/http/wsgihttpserver.py	2007-06-02 18:19:23 UTC (rev 76161)
+++ zope.server/trunk/src/zope/server/http/wsgihttpserver.py	2007-06-02 18:26:38 UTC (rev 76162)
@@ -45,11 +45,29 @@
 
         HTTPServer.__init__(self, *args, **kw)
 
+    def _constructWSGIEnvironment(self, task):
+        env = task.getCGIEnvironment()
 
+        # deduce the URL scheme (http or https)
+        if (env.get('HTTPS', '').lower() == "on" or
+            env.get('SERVER_PORT_SECURE') == "1"):
+            protocol = 'https'
+        else:
+            protocol = 'http'
+
+        # the following environment variables are required by the WSGI spec
+        env['wsgi.version'] = (1,0)
+        env['wsgi.url_scheme'] = protocol
+        env['wsgi.errors'] = sys.stderr # apps should use the logging module
+        env['wsgi.multithread'] = True
+        env['wsgi.multiprocess'] = True
+        env['wsgi.run_once'] = False
+        env['wsgi.input'] = task.request_data.getBodyStream()
+        return env
+
     def executeRequest(self, task):
         """Overrides HTTPServer.executeRequest()."""
-        env = task.getCGIEnvironment()
-        env['wsgi.input'] = task.request_data.getBodyStream()
+        env = self._constructWSGIEnvironment(task)
 
         def start_response(status, headers):
             # Prepare the headers for output
@@ -69,8 +87,7 @@
 
     def executeRequest(self, task):
         """Overrides HTTPServer.executeRequest()."""
-        env = task.getCGIEnvironment()
-        env['wsgi.input'] = task.request_data.getBodyStream()
+        env = self._constructWSGIEnvironment(task)
         env['wsgi.handleErrors'] = False
 
         def start_response(status, headers):



More information about the Checkins mailing list