[Checkins] SVN: zope.app.wsgi/trunk/src/zope/app/wsgi/testlayer.py Add more compatiblity with zope.app.testing.
Sylvain Viollon
sylvain at infrae.com
Wed Apr 14 12:00:05 EDT 2010
Log message for revision 110902:
Add more compatiblity with zope.app.testing.
Changed:
U zope.app.wsgi/trunk/src/zope/app/wsgi/testlayer.py
-=-
Modified: zope.app.wsgi/trunk/src/zope/app/wsgi/testlayer.py
===================================================================
--- zope.app.wsgi/trunk/src/zope/app/wsgi/testlayer.py 2010-04-14 15:39:16 UTC (rev 110901)
+++ zope.app.wsgi/trunk/src/zope/app/wsgi/testlayer.py 2010-04-14 16:00:05 UTC (rev 110902)
@@ -46,6 +46,8 @@
ZopeTestbrowser.__init__(self, *args, **kwargs)
+# Compatibility helpers to behave like zope.app.testing
+
basicre = re.compile('Basic (.+)?:(.+)?$')
def auth_header(header):
"""This function takes an authorization HTTP header and encode the
@@ -64,6 +66,13 @@
return header
+def is_wanted_header(header):
+ """Return True if the given HTTP header key is unwanted.
+ """
+ key, value = header
+ return key.lower() not in ('x-content-type-warning', 'x-powered-by')
+
+
class TestBrowserMiddleware(object):
"""This middleware makes the WSGI application compatible with the
HTTPCaller behavior defined in zope.app.testing.functional:
@@ -75,20 +84,30 @@
password into base 64 if it is Basic authentication.
"""
- def __init__(self, app, root):
+ def __init__(self, app, root, handle_errors):
+ assert isinstance(handle_errors, bool)
self.root = root
self.app = app
+ self.default_handle_errors = str(handle_errors)
def __call__(self, environ, start_response):
- handle_errors = environ.get('HTTP_X_ZOPE_HANDLE_ERRORS', 'True')
+ # Handle debug mode
+ handle_errors = environ.get(
+ 'HTTP_X_ZOPE_HANDLE_ERRORS', self.default_handle_errors)
self.app.handleErrors = handle_errors == 'True'
+ # Handle authorization
auth_key = 'HTTP_AUTHORIZATION'
if environ.has_key(auth_key):
environ[auth_key] = auth_header(environ[auth_key])
+ # Remove unwanted headers
+ def application_start_response(status, headers):
+ headers = filter(is_wanted_header, headers)
+ start_response(status, headers)
+
commit()
- for entry in self.app(environ, start_response):
+ for entry in self.app(environ, application_start_response):
yield entry
self.root._p_jar.sync()
@@ -107,8 +126,9 @@
wsgi_app = WSGIPublisherApplication(
self.db, HTTPPublicationRequestFactory, True)
- def factory():
- return TestBrowserMiddleware(wsgi_app, self.getRootFolder())
+ def factory(handle_errors=True):
+ return TestBrowserMiddleware(
+ wsgi_app, self.getRootFolder(), handle_errors)
for host in TEST_HOSTS:
wsgi_intercept.add_wsgi_intercept(host, 80, factory)
@@ -125,17 +145,49 @@
"""
-def http(string):
+class FakeResponse(object):
+ """This behave like a Response object returned by HTTPCaller of
+ zope.app.testing.functional.
+ """
+
+ def __init__(self, response_text):
+ self.response_text = response_text
+
+ def getStatus(self):
+ line = self.getStatusString()
+ protocol, status, rest = line.split(' ', 2)
+ return int(status)
+
+ def getStatusString(self):
+ return self.response_text.split('\n', 1)[0]
+
+ def getBody(self):
+ parts = self.response_text.split('\n\n', 1)
+ if len(parts) < 2:
+ return ''
+ return parts[1]
+
+ def getOutput(self):
+ return self.response_text
+
+ __str__ = getOutput
+
+
+
+def http(string, handle_errors=True):
+ """This function behave like the HTTPCaller of
+ zope.app.testing.functional.
+ """
key = ('localhost', 80)
if key not in wsgi_intercept._wsgi_intercept:
raise NotInBrowserLayer(NotInBrowserLayer.__doc__)
(app_fn, script_name) = wsgi_intercept._wsgi_intercept[key]
- app = app_fn()
+ app = app_fn(handle_errors=handle_errors)
socket = wsgi_intercept.wsgi_fake_socket(app, 'localhost', 80, '')
socket.sendall(string.lstrip())
result = socket.makefile()
- return result.getvalue()
+ return FakeResponse(result.getvalue())
More information about the checkins
mailing list