[Checkins] SVN: zope.app.wsgi/branches/janjaapdriessen-webtest/src/zope/app/wsgi/testlayer.py Reinstate the AuthorizationMiddleware. Not sure whether this belongs in zope.testbrowser or in zope.app.wsgi. The 'http caller' now uses webtest instead of wsgi_intercept in order to find the app under test. Ugly import _APP_UNDER_TEST may need to go to zope.testbrowser. http caller returns a FakeResponse object in order to make tests pass. We could use the WebOb response in tests too.

Jan-Jaap Driessen jdriessen at thehealthagency.com
Fri Mar 4 18:25:55 EST 2011


Log message for revision 120756:
  Reinstate the AuthorizationMiddleware. Not sure whether this belongs in zope.testbrowser or in zope.app.wsgi.   The 'http caller' now uses webtest instead of wsgi_intercept in order to find the app under test. Ugly import _APP_UNDER_TEST may need to go to zope.testbrowser.    http caller returns a FakeResponse object in order to make tests pass. We could use the WebOb response in tests too.

Changed:
  U   zope.app.wsgi/branches/janjaapdriessen-webtest/src/zope/app/wsgi/testlayer.py

-=-
Modified: zope.app.wsgi/branches/janjaapdriessen-webtest/src/zope/app/wsgi/testlayer.py
===================================================================
--- zope.app.wsgi/branches/janjaapdriessen-webtest/src/zope/app/wsgi/testlayer.py	2011-03-04 23:22:46 UTC (rev 120755)
+++ zope.app.wsgi/branches/janjaapdriessen-webtest/src/zope/app/wsgi/testlayer.py	2011-03-04 23:25:54 UTC (rev 120756)
@@ -14,12 +14,14 @@
 from StringIO import StringIO
 import httplib
 import xmlrpclib
+import base64
+import re
 
 import transaction
 from zope.app.appsetup.testlayer import ZODBLayer
 from zope.app.wsgi import WSGIPublisherApplication
-import wsgi_intercept
 import zope.testbrowser.wsgi
+from webtest import TestRequest
 
 # BBB
 from zope.testbrowser.wsgi import Browser
@@ -45,7 +47,55 @@
             yield entry
         self.root_factory()._p_jar.sync()
 
+basicre = re.compile('Basic (.+)?:(.+)?$')
 
+def auth_header(header):
+    """This function takes an authorization HTTP header and encode the
+    couple user, password into base 64 like the HTTP protocol wants
+    it.
+    """
+    match = basicre.match(header)
+    if match:
+        u, p = match.group(1, 2)
+        if u is None:
+            u = ''
+        if p is None:
+            p = ''
+        auth = base64.encodestring('%s:%s' % (u, p))
+        return 'Basic %s' % auth[:-1]
+    return header
+
+def is_wanted_header(header):
+    """Return True if the given HTTP header key is wanted.
+    """
+    key, value = header
+    return key.lower() not in ('x-content-type-warning', 'x-powered-by')
+
+class AuthorizationMiddleware(object):
+    """This middleware makes the WSGI application compatible with the
+    HTTPCaller behavior defined in zope.app.testing.functional:
+    - It modifies the HTTP Authorization header to encode user and
+      password into base64 if it is Basic authentication.
+    """
+
+    def __init__(self, wsgi_stack):
+        self.wsgi_stack = wsgi_stack
+
+    def __call__(self, environ, start_response):
+        # Handle authorization
+        auth_key = 'HTTP_AUTHORIZATION'
+        if auth_key in environ:
+            environ[auth_key] = auth_header(environ[auth_key])
+
+        # Remove unwanted headers
+        def application_start_response(status, headers, exc_info=None):
+            headers = filter(is_wanted_header, headers)
+            start_response(status, headers)
+
+        for entry in self.wsgi_stack(environ, application_start_response):
+            yield entry
+
+
 class HandleErrorsMiddleware(object):
     """This middleware makes the WSGI application compatible with the
     HTTPCaller behavior defined in zope.app.testing.functional:
@@ -73,9 +123,8 @@
     """This create a test layer with a test database and register a wsgi
     application to use that test database.
 
-    A wsgi_intercept handler is installed as well, so you can use a
-    WSGI version of zope.testbrowser Browser instance to access the
-    application.
+    You can use a WSGI version of zope.testbrowser Browser instance to access
+    the application.
     """
 
     def setup_middleware(self, app):
@@ -90,11 +139,11 @@
         # off of that in testSetUp()
         fake_db = object()
         self._application = WSGIPublisherApplication(fake_db)
-        return HandleErrorsMiddleware(
+        return AuthorizationMiddleware(HandleErrorsMiddleware(
             self._application,
             TransactionMiddleware(
                 self.getRootFolder,
-                self.setup_middleware(self._application)))
+                self.setup_middleware(self._application))))
 
     def testSetUp(self):
         super(BrowserLayer, self).testSetUp()
@@ -114,80 +163,47 @@
     zope.app.testing.functional.
     """
 
-    def __init__(self, response_text):
-        self.response_text = response_text
+    def __init__(self, response):
+        self.response = response
 
     def getStatus(self):
-        line = self.getStatusString()
-        status, rest = line.split(' ', 1)
-        return int(status)
+        return self.response.status_int
 
     def getStatusString(self):
-        status_line = self.response_text.split('\n', 1)[0]
-        protocol, status_string = status_line.split(' ', 1)
-        return status_string
+        return self.response.status
 
     def getHeader(self, name, default=None):
-        without_body = self.response_text.split('\n\n', 1)[0]
-        headers_text = without_body.split('\n', 1)[1]
-        headers = headers_text.split('\n')
-        result = []
-        for header in headers:
-            header_name, header_value = header.split(': ', 1)
-            if name == header_name:
-                result.append(header_value)
-        if not result:
-            return default
-        elif len(result) == 1:
-            return result[0]
-        else:
-            return result
+        return self.response.headers.get(name, default)
 
     def getHeaders(self):
-        without_body = self.response_text.split('\n\n', 1)[0]
-        headers_text = without_body.split('\n', 1)[1]
-        headers = headers_text.split('\n')
-        result = []
-        for header in headers:
-            header_name, header_value = header.split(':', 1)
-            result.append((header_name, header_value))
-        return result
+        return self.response.headerlist
 
     def getBody(self):
-        parts = self.response_text.split('\n\n', 1)
-        if len(parts) < 2:
-            return ''
-        return parts[1]
+        return self.response.body
 
     def getOutput(self):
-        return self.response_text
+        parts = ['HTTP/1.0 ' + self.response.status]
+        parts += map('%s: %s'.__mod__, self.response.headerlist)
+        if self.response.body:
+            parts += ['', self.response.body]
+        return '\n'.join(parts)
 
     __str__ = getOutput
 
-# XXX seems to only used by tests of zope.app.publication, maybe it should
-# be moved there
+
 def http(string, handle_errors=True):
-    """This function behave like the HTTPCaller of
-    zope.app.testing.functional.
-    """
-    key = ('localhost', 80)
 
-    if key not in wsgi_intercept._wsgi_intercept:
+    app = zope.testbrowser.wsgi._APP_UNDER_TEST
+    if app is None:
         raise NotInBrowserLayer(NotInBrowserLayer.__doc__)
 
-    (app_fn, script_name) = wsgi_intercept._wsgi_intercept[key]
-    app = app_fn()
+    request = TestRequest.from_file(StringIO(string))
+    request.headers['X-zope-handle-errors'] = str(handle_errors)
 
-    if not string.endswith('\n'):
-        string += '\n'
-    string += 'X-zope-handle-errors: %s\n' % handle_errors
+    response = request.get_response(app)
+    return FakeResponse(response)
 
-    socket = wsgi_intercept.wsgi_fake_socket(app, 'localhost', 80, '')
-    socket.sendall(string.lstrip())
-    result = socket.makefile()
-    return FakeResponse(result.getvalue())
 
-
 class FakeSocket(object):
 
     def __init__(self, data):



More information about the checkins mailing list