[Checkins] SVN: zope.app.wsgi/branches/janjaapdriessen-webtest/src/zope/app/wsgi/testlayer.py Reinstate the AuthorizationMiddleware. Not sure whether this belongs in zope.testbrowser or in zope.app.wsgi. The 'http caller' now uses webtest instead of wsgi_intercept in order to find the app under test. Ugly import _APP_UNDER_TEST may need to go to zope.testbrowser. http caller returns a FakeResponse object in order to make tests pass. We could use the WebOb response in tests too.
Jan-Jaap Driessen
jdriessen at thehealthagency.com
Fri Mar 4 18:25:55 EST 2011
Log message for revision 120756:
Reinstate the AuthorizationMiddleware. Not sure whether this belongs in zope.testbrowser or in zope.app.wsgi. The 'http caller' now uses webtest instead of wsgi_intercept in order to find the app under test. Ugly import _APP_UNDER_TEST may need to go to zope.testbrowser. http caller returns a FakeResponse object in order to make tests pass. We could use the WebOb response in tests too.
Changed:
U zope.app.wsgi/branches/janjaapdriessen-webtest/src/zope/app/wsgi/testlayer.py
-=-
Modified: zope.app.wsgi/branches/janjaapdriessen-webtest/src/zope/app/wsgi/testlayer.py
===================================================================
--- zope.app.wsgi/branches/janjaapdriessen-webtest/src/zope/app/wsgi/testlayer.py 2011-03-04 23:22:46 UTC (rev 120755)
+++ zope.app.wsgi/branches/janjaapdriessen-webtest/src/zope/app/wsgi/testlayer.py 2011-03-04 23:25:54 UTC (rev 120756)
@@ -14,12 +14,14 @@
from StringIO import StringIO
import httplib
import xmlrpclib
+import base64
+import re
import transaction
from zope.app.appsetup.testlayer import ZODBLayer
from zope.app.wsgi import WSGIPublisherApplication
-import wsgi_intercept
import zope.testbrowser.wsgi
+from webtest import TestRequest
# BBB
from zope.testbrowser.wsgi import Browser
@@ -45,7 +47,55 @@
yield entry
self.root_factory()._p_jar.sync()
+basicre = re.compile('Basic (.+)?:(.+)?$')
+def auth_header(header):
+ """This function takes an authorization HTTP header and encode the
+ couple user, password into base 64 like the HTTP protocol wants
+ it.
+ """
+ match = basicre.match(header)
+ if match:
+ u, p = match.group(1, 2)
+ if u is None:
+ u = ''
+ if p is None:
+ p = ''
+ auth = base64.encodestring('%s:%s' % (u, p))
+ return 'Basic %s' % auth[:-1]
+ return header
+
+def is_wanted_header(header):
+ """Return True if the given HTTP header key is wanted.
+ """
+ key, value = header
+ return key.lower() not in ('x-content-type-warning', 'x-powered-by')
+
+class AuthorizationMiddleware(object):
+ """This middleware makes the WSGI application compatible with the
+ HTTPCaller behavior defined in zope.app.testing.functional:
+ - It modifies the HTTP Authorization header to encode user and
+ password into base64 if it is Basic authentication.
+ """
+
+ def __init__(self, wsgi_stack):
+ self.wsgi_stack = wsgi_stack
+
+ def __call__(self, environ, start_response):
+ # Handle authorization
+ auth_key = 'HTTP_AUTHORIZATION'
+ if auth_key in environ:
+ environ[auth_key] = auth_header(environ[auth_key])
+
+ # Remove unwanted headers
+ def application_start_response(status, headers, exc_info=None):
+ headers = filter(is_wanted_header, headers)
+ start_response(status, headers)
+
+ for entry in self.wsgi_stack(environ, application_start_response):
+ yield entry
+
+
class HandleErrorsMiddleware(object):
"""This middleware makes the WSGI application compatible with the
HTTPCaller behavior defined in zope.app.testing.functional:
@@ -73,9 +123,8 @@
"""This create a test layer with a test database and register a wsgi
application to use that test database.
- A wsgi_intercept handler is installed as well, so you can use a
- WSGI version of zope.testbrowser Browser instance to access the
- application.
+ You can use a WSGI version of zope.testbrowser Browser instance to access
+ the application.
"""
def setup_middleware(self, app):
@@ -90,11 +139,11 @@
# off of that in testSetUp()
fake_db = object()
self._application = WSGIPublisherApplication(fake_db)
- return HandleErrorsMiddleware(
+ return AuthorizationMiddleware(HandleErrorsMiddleware(
self._application,
TransactionMiddleware(
self.getRootFolder,
- self.setup_middleware(self._application)))
+ self.setup_middleware(self._application))))
def testSetUp(self):
super(BrowserLayer, self).testSetUp()
@@ -114,80 +163,47 @@
zope.app.testing.functional.
"""
- def __init__(self, response_text):
- self.response_text = response_text
+ def __init__(self, response):
+ self.response = response
def getStatus(self):
- line = self.getStatusString()
- status, rest = line.split(' ', 1)
- return int(status)
+ return self.response.status_int
def getStatusString(self):
- status_line = self.response_text.split('\n', 1)[0]
- protocol, status_string = status_line.split(' ', 1)
- return status_string
+ return self.response.status
def getHeader(self, name, default=None):
- without_body = self.response_text.split('\n\n', 1)[0]
- headers_text = without_body.split('\n', 1)[1]
- headers = headers_text.split('\n')
- result = []
- for header in headers:
- header_name, header_value = header.split(': ', 1)
- if name == header_name:
- result.append(header_value)
- if not result:
- return default
- elif len(result) == 1:
- return result[0]
- else:
- return result
+ return self.response.headers.get(name, default)
def getHeaders(self):
- without_body = self.response_text.split('\n\n', 1)[0]
- headers_text = without_body.split('\n', 1)[1]
- headers = headers_text.split('\n')
- result = []
- for header in headers:
- header_name, header_value = header.split(':', 1)
- result.append((header_name, header_value))
- return result
+ return self.response.headerlist
def getBody(self):
- parts = self.response_text.split('\n\n', 1)
- if len(parts) < 2:
- return ''
- return parts[1]
+ return self.response.body
def getOutput(self):
- return self.response_text
+ parts = ['HTTP/1.0 ' + self.response.status]
+ parts += map('%s: %s'.__mod__, self.response.headerlist)
+ if self.response.body:
+ parts += ['', self.response.body]
+ return '\n'.join(parts)
__str__ = getOutput
-# XXX seems to only used by tests of zope.app.publication, maybe it should
-# be moved there
+
def http(string, handle_errors=True):
- """This function behave like the HTTPCaller of
- zope.app.testing.functional.
- """
- key = ('localhost', 80)
- if key not in wsgi_intercept._wsgi_intercept:
+ app = zope.testbrowser.wsgi._APP_UNDER_TEST
+ if app is None:
raise NotInBrowserLayer(NotInBrowserLayer.__doc__)
- (app_fn, script_name) = wsgi_intercept._wsgi_intercept[key]
- app = app_fn()
+ request = TestRequest.from_file(StringIO(string))
+ request.headers['X-zope-handle-errors'] = str(handle_errors)
- if not string.endswith('\n'):
- string += '\n'
- string += 'X-zope-handle-errors: %s\n' % handle_errors
+ response = request.get_response(app)
+ return FakeResponse(response)
- socket = wsgi_intercept.wsgi_fake_socket(app, 'localhost', 80, '')
- socket.sendall(string.lstrip())
- result = socket.makefile()
- return FakeResponse(result.getvalue())
-
class FakeSocket(object):
def __init__(self, data):
More information about the checkins
mailing list