[Checkins] SVN: zope.server/trunk/src/zope/server/http/ Make
zope.server's HTTP server *really* WSGI compliant by adding
variables to the
Philipp von Weitershausen
philikon at philikon.de
Sat Jun 2 14:26:38 EDT 2007
Log message for revision 76162:
Make zope.server's HTTP server *really* WSGI compliant by adding variables to the
environment that are required by the WSGI spec.
Changed:
U zope.server/trunk/src/zope/server/http/tests/test_wsgiserver.py
U zope.server/trunk/src/zope/server/http/wsgihttpserver.py
-=-
Modified: zope.server/trunk/src/zope/server/http/tests/test_wsgiserver.py
===================================================================
--- zope.server/trunk/src/zope/server/http/tests/test_wsgiserver.py 2007-06-02 18:19:23 UTC (rev 76161)
+++ zope.server/trunk/src/zope/server/http/tests/test_wsgiserver.py 2007-06-02 18:26:38 UTC (rev 76162)
@@ -88,6 +88,37 @@
raise Conflict
+class WSGIInfo(object):
+ """Docstring required by publisher"""
+
+ def __call__(self, REQUEST):
+ """Return a list of variables beginning with 'wsgi.'"""
+ r = []
+ for name in REQUEST.keys():
+ if name.startswith('wsgi.'):
+ r.append(name)
+ return ' '.join(r)
+
+ def version(self, REQUEST):
+ """Return WSGI version"""
+ return str(REQUEST['wsgi.version'])
+
+ def url_scheme(self, REQUEST):
+ """Return WSGI URL scheme"""
+ return REQUEST['wsgi.url_scheme']
+
+ def multithread(self, REQUEST):
+ """Return WSGI multithreadedness"""
+ return str(bool(REQUEST['wsgi.multithread']))
+
+ def multiprocess(self, REQUEST):
+ """Return WSGI multiprocessedness"""
+ return str(bool(REQUEST['wsgi.multiprocess']))
+
+ def run_once(self, REQUEST):
+ """Return whether WSGI app is invoked only once or not"""
+ return str(bool(REQUEST['wsgi.run_once']))
+
class Tests(PlacelessSetup, unittest.TestCase):
def setUp(self):
@@ -97,8 +128,8 @@
obj = tested_object()
obj.folder = tested_object()
obj.folder.item = tested_object()
-
obj._protected = tested_object()
+ obj.wsgi = WSGIInfo()
pub = PublicationWithConflict(obj)
@@ -132,8 +163,7 @@
while self.run_loop:
poll(0.1, socket_map)
- def testResponse(self, path='/', status_expected=200,
- add_headers=None, request_body=''):
+ def invokeRequest(self, path='/', add_headers=None, request_body=''):
h = HTTPConnection(LOCALHOST, self.port)
h.putrequest('GET', path)
h.putheader('Accept', 'text/plain')
@@ -152,43 +182,76 @@
else:
response_body = ''
- # Please do not disable the status code check. It must work.
- self.failUnlessEqual(int(response.status), status_expected)
+ self.assertEqual(length, len(response_body))
- self.failUnlessEqual(length, len(response_body))
+ return response.status, response_body
- if (status_expected == 200):
- if path == '/': path = ''
- expect_response = 'URL invoked: http://%s:%d%s' % (LOCALHOST,
- self.port, path)
- self.failUnlessEqual(response_body, expect_response)
def testDeeperPath(self):
- self.testResponse(path='/folder/item')
+ status, response_body = self.invokeRequest('/folder/item')
+ self.assertEqual(status, 200)
+ expect_response = 'URL invoked: http://%s:%d/folder/item' % (
+ LOCALHOST, self.port)
+ self.assertEqual(response_body, expect_response)
def testNotFound(self):
- self.testResponse(path='/foo/bar', status_expected=404)
+ status, response_body = self.invokeRequest('/foo/bar')
+ self.assertEqual(status, 404)
def testUnauthorized(self):
- self.testResponse(path='/_protected', status_expected=401)
+ status, response_body = self.invokeRequest('/_protected')
+ self.assertEqual(status, 401)
def testRedirectMethod(self):
- self.testResponse(path='/redirect_method', status_expected=303)
+ status, response_body = self.invokeRequest('/redirect_method')
+ self.assertEqual(status, 303)
def testRedirectException(self):
- self.testResponse(path='/redirect_exception', status_expected=303)
- self.testResponse(path='/folder/redirect_exception',
- status_expected=303)
+ status, response_body = self.invokeRequest('/redirect_exception')
+ self.assertEqual(status, 303)
+ status, response_body = self.invokeRequest('/folder/redirect_exception')
+ self.assertEqual(status, 303)
def testConflictRetry(self):
+ status, response_body = self.invokeRequest('/conflict?wait_tries=2')
# Expect the "Accepted" response since the retries will succeed.
- self.testResponse(path='/conflict?wait_tries=2', status_expected=202)
+ self.assertEqual(status, 202)
def testFailedConflictRetry(self):
+ status, response_body = self.invokeRequest('/conflict?wait_tries=10')
# Expect a "Conflict" response since there will be too many
# conflicts.
- self.testResponse(path='/conflict?wait_tries=10', status_expected=409)
+ self.assertEqual(status, 409)
+ def testWSGIVariables(self):
+ # Assert that the environment contains all required WSGI variables
+ status, response_body = self.invokeRequest('/wsgi')
+ wsgi_variables = set(response_body.split())
+ self.assertEqual(wsgi_variables,
+ set(['wsgi.version', 'wsgi.url_scheme', 'wsgi.input',
+ 'wsgi.errors', 'wsgi.multithread',
+ 'wsgi.multiprocess', 'wsgi.run_once']))
+
+ def testWSGIVersion(self):
+ status, response_body = self.invokeRequest('/wsgi/version')
+ self.assertEqual("(1, 0)", response_body)
+
+ def testWSGIURLScheme(self):
+ status, response_body = self.invokeRequest('/wsgi/url_scheme')
+ self.assertEqual('http', response_body)
+
+ def testWSGIMultithread(self):
+ status, response_body = self.invokeRequest('/wsgi/multithread')
+ self.assertEqual('True', response_body)
+
+ def testWSGIMultiprocess(self):
+ status, response_body = self.invokeRequest('/wsgi/multiprocess')
+ self.assertEqual('True', response_body)
+
+ def testWSGIRunOnce(self):
+ status, response_body = self.invokeRequest('/wsgi/run_once')
+ self.assertEqual('False', response_body)
+
def test_server_uses_iterable(self):
# Make sure that the task write method isn't called with a
# str or non iterable
@@ -205,9 +268,9 @@
list(v)
self.server.executeRequest(FakeTask())
+
def test_suite():
- loader = unittest.TestLoader()
- return loader.loadTestsFromTestCase(Tests)
+ return unittest.TestSuite(unittest.makeSuite(Tests))
-if __name__=='__main__':
- unittest.TextTestRunner().run(test_suite())
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
Modified: zope.server/trunk/src/zope/server/http/wsgihttpserver.py
===================================================================
--- zope.server/trunk/src/zope/server/http/wsgihttpserver.py 2007-06-02 18:19:23 UTC (rev 76161)
+++ zope.server/trunk/src/zope/server/http/wsgihttpserver.py 2007-06-02 18:26:38 UTC (rev 76162)
@@ -45,11 +45,29 @@
HTTPServer.__init__(self, *args, **kw)
+ def _constructWSGIEnvironment(self, task):
+ env = task.getCGIEnvironment()
+ # deduce the URL scheme (http or https)
+ if (env.get('HTTPS', '').lower() == "on" or
+ env.get('SERVER_PORT_SECURE') == "1"):
+ protocol = 'https'
+ else:
+ protocol = 'http'
+
+ # the following environment variables are required by the WSGI spec
+ env['wsgi.version'] = (1,0)
+ env['wsgi.url_scheme'] = protocol
+ env['wsgi.errors'] = sys.stderr # apps should use the logging module
+ env['wsgi.multithread'] = True
+ env['wsgi.multiprocess'] = True
+ env['wsgi.run_once'] = False
+ env['wsgi.input'] = task.request_data.getBodyStream()
+ return env
+
def executeRequest(self, task):
"""Overrides HTTPServer.executeRequest()."""
- env = task.getCGIEnvironment()
- env['wsgi.input'] = task.request_data.getBodyStream()
+ env = self._constructWSGIEnvironment(task)
def start_response(status, headers):
# Prepare the headers for output
@@ -69,8 +87,7 @@
def executeRequest(self, task):
"""Overrides HTTPServer.executeRequest()."""
- env = task.getCGIEnvironment()
- env['wsgi.input'] = task.request_data.getBodyStream()
+ env = self._constructWSGIEnvironment(task)
env['wsgi.handleErrors'] = False
def start_response(status, headers):
More information about the Checkins
mailing list