[Checkins] SVN: zope.app.wsgi/trunk/src/zope/app/wsgi/testlayer.py Add more compatiblity with zope.app.testing.

Sylvain Viollon sylvain at infrae.com
Wed Apr 14 12:00:05 EDT 2010


Log message for revision 110902:
  Add more compatiblity with zope.app.testing.
  

Changed:
  U   zope.app.wsgi/trunk/src/zope/app/wsgi/testlayer.py

-=-
Modified: zope.app.wsgi/trunk/src/zope/app/wsgi/testlayer.py
===================================================================
--- zope.app.wsgi/trunk/src/zope/app/wsgi/testlayer.py	2010-04-14 15:39:16 UTC (rev 110901)
+++ zope.app.wsgi/trunk/src/zope/app/wsgi/testlayer.py	2010-04-14 16:00:05 UTC (rev 110902)
@@ -46,6 +46,8 @@
         ZopeTestbrowser.__init__(self, *args, **kwargs)
 
 
+# Compatibility helpers to behave like zope.app.testing
+
 basicre = re.compile('Basic (.+)?:(.+)?$')
 def auth_header(header):
     """This function takes an authorization HTTP header and encode the
@@ -64,6 +66,13 @@
     return header
 
 
+def is_wanted_header(header):
+    """Return True if the given HTTP header key is unwanted.
+    """
+    key, value = header
+    return key.lower() not in ('x-content-type-warning', 'x-powered-by')
+
+
 class TestBrowserMiddleware(object):
     """This middleware makes the WSGI application compatible with the
     HTTPCaller behavior defined in zope.app.testing.functional:
@@ -75,20 +84,30 @@
       password into base 64 if it is Basic authentication.
     """
 
-    def __init__(self, app, root):
+    def __init__(self, app, root, handle_errors):
+        assert isinstance(handle_errors, bool)
         self.root = root
         self.app = app
+        self.default_handle_errors = str(handle_errors)
 
     def __call__(self, environ, start_response):
-        handle_errors = environ.get('HTTP_X_ZOPE_HANDLE_ERRORS', 'True')
+        # Handle debug mode
+        handle_errors = environ.get(
+            'HTTP_X_ZOPE_HANDLE_ERRORS', self.default_handle_errors)
         self.app.handleErrors = handle_errors == 'True'
 
+        # Handle authorization
         auth_key = 'HTTP_AUTHORIZATION'
         if environ.has_key(auth_key):
             environ[auth_key] = auth_header(environ[auth_key])
 
+        # Remove unwanted headers
+        def application_start_response(status, headers):
+            headers = filter(is_wanted_header, headers)
+            start_response(status, headers)
+
         commit()
-        for entry in self.app(environ, start_response):
+        for entry in self.app(environ, application_start_response):
             yield entry
         self.root._p_jar.sync()
 
@@ -107,8 +126,9 @@
         wsgi_app = WSGIPublisherApplication(
             self.db, HTTPPublicationRequestFactory, True)
 
-        def factory():
-            return TestBrowserMiddleware(wsgi_app, self.getRootFolder())
+        def factory(handle_errors=True):
+            return TestBrowserMiddleware(
+                wsgi_app, self.getRootFolder(), handle_errors)
 
         for host in TEST_HOSTS:
             wsgi_intercept.add_wsgi_intercept(host, 80, factory)
@@ -125,17 +145,49 @@
     """
 
 
-def http(string):
+class FakeResponse(object):
+    """This behave like a Response object returned by HTTPCaller of
+    zope.app.testing.functional.
+    """
+
+    def __init__(self, response_text):
+        self.response_text = response_text
+
+    def getStatus(self):
+        line = self.getStatusString()
+        protocol, status, rest = line.split(' ', 2)
+        return int(status)
+
+    def getStatusString(self):
+        return self.response_text.split('\n', 1)[0]
+
+    def getBody(self):
+        parts = self.response_text.split('\n\n', 1)
+        if len(parts) < 2:
+            return ''
+        return parts[1]
+
+    def getOutput(self):
+        return self.response_text
+
+    __str__ = getOutput
+
+
+
+def http(string, handle_errors=True):
+    """This function behave like the HTTPCaller of
+    zope.app.testing.functional.
+    """
     key = ('localhost', 80)
 
     if key not in wsgi_intercept._wsgi_intercept:
         raise NotInBrowserLayer(NotInBrowserLayer.__doc__)
 
     (app_fn, script_name) = wsgi_intercept._wsgi_intercept[key]
-    app = app_fn()
+    app = app_fn(handle_errors=handle_errors)
 
     socket = wsgi_intercept.wsgi_fake_socket(app, 'localhost', 80, '')
     socket.sendall(string.lstrip())
     result = socket.makefile()
-    return result.getvalue()
+    return FakeResponse(result.getvalue())
 



More information about the checkins mailing list