[Checkins] SVN: zope.testbrowser/jinty-webtest3-minimal/s Replace wsgi-intercept implementation with WebTest implementation

Brian Sutherland jinty at web.de
Sat Feb 12 12:41:32 EST 2011


Log message for revision 120302:
  Replace wsgi-intercept implementation with WebTest implementation

Changed:
  U   zope.testbrowser/jinty-webtest3-minimal/setup.py
  U   zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/README.txt
  A   zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/connection.py
  U   zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/wsgi.py

-=-
Modified: zope.testbrowser/jinty-webtest3-minimal/setup.py
===================================================================
--- zope.testbrowser/jinty-webtest3-minimal/setup.py	2011-02-12 17:38:22 UTC (rev 120301)
+++ zope.testbrowser/jinty-webtest3-minimal/setup.py	2011-02-12 17:41:32 UTC (rev 120302)
@@ -75,7 +75,7 @@
             'zope.app.testing',
             ],
         'wsgi': [
-            'wsgi_intercept',
+            'WebTest',
             ]
         },
     include_package_data = True,

Modified: zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/README.txt
===================================================================
--- zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/README.txt	2011-02-12 17:38:22 UTC (rev 120301)
+++ zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/README.txt	2011-02-12 17:41:32 UTC (rev 120302)
@@ -25,14 +25,20 @@
 applications, it can be imported from ``zope.testbrowser.wsgi``:
 
     >>> from zope.testbrowser.wsgi import Browser
-    >>> browser = Browser()
+    >>> from wsgiref.simple_server import demo_app
+    >>> browser = Browser('http://localhost/', wsgi_app=demo_app)
+    >>> print browser.contents
+    Hello world!
+    ...
 
-.. _`wsgi_intercept`: http://pypi.python.org/pypi/wsgi_intercept
+.. _`WebTest`: http://pypi.python.org/pypi/WebTest
 
 To use this browser you have to:
 
   * use the `wsgi` extra of the ``zope.testbrowser`` egg,
 
+You can also use it with zope layers by:
+
   * write a subclass of ``zope.testbrowser.wsgi.Layer`` and override the
     ``make_wsgi_app`` method,
 

Added: zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/connection.py
===================================================================
--- zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/connection.py	                        (rev 0)
+++ zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/connection.py	2011-02-12 17:41:32 UTC (rev 120302)
@@ -0,0 +1,96 @@
+##############################################################################
+#
+# Copyright (c) 2005 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Base classes sometimes useful to implement browsers
+"""
+import cStringIO
+import httplib
+import mechanize
+import socket
+import sys
+import zope.testbrowser.browser
+
+
+class Response(object):
+    """``mechanize`` compatible response object."""
+
+    def __init__(self, content, headers, status, reason):
+        self.content = content
+        self.status = status
+        self.reason = reason
+        self.msg = httplib.HTTPMessage(cStringIO.StringIO(headers), 0)
+        self.content_as_file = cStringIO.StringIO(self.content)
+
+    def read(self, amt=None):
+        return self.content_as_file.read(amt)
+
+    def close(self):
+        """To overcome changes in mechanize and socket in python2.5"""
+        pass
+
+class HTTPHandler(mechanize.HTTPHandler):
+
+    def _connect(self, *args, **kw):
+        raise NotImplementedError("implement")
+
+    def http_request(self, req):
+        # look at data and set content type
+        if req.has_data():
+            data = req.get_data()
+            if isinstance(data, dict):
+                req.add_data(data['body'])
+                req.add_unredirected_header('Content-type',
+                                            data['content-type'])
+        return mechanize.HTTPHandler.do_request_(self, req)
+
+    https_request = http_request
+
+    def http_open(self, req):
+        """Open an HTTP connection having a ``mechanize`` request."""
+        # Here we connect to the publisher.
+        if sys.version_info > (2, 6) and not hasattr(req, 'timeout'):
+            # Workaround mechanize incompatibility with Python
+            # 2.6. See: LP #280334
+            req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
+        return self.do_open(self._connect, req)
+
+    https_open = http_open
+
+class MechanizeBrowser(mechanize.Browser):
+    """Special ``mechanize`` browser using the Zope Publisher HTTP handler."""
+
+    default_schemes = ['http']
+    default_others = ['_http_error', '_http_default_error']
+    default_features = ['_redirect', '_cookies', '_referer', '_refresh',
+                        '_equiv', '_basicauth', '_digestauth']
+
+
+    def __init__(self, *args, **kws):
+        inherited_handlers = ['_unknown', '_http_error',
+            '_http_default_error', '_basicauth',
+            '_digestauth', '_redirect', '_cookies', '_referer',
+            '_refresh', '_equiv', '_gzip']
+
+        self.handler_classes = {"http": self._http_handler}
+        for name in inherited_handlers:
+            self.handler_classes[name] = mechanize.Browser.handler_classes[name]
+
+        kws['request_class'] = kws.get('request_class',
+                                       mechanize._request.Request)
+
+        mechanize.Browser.__init__(self, *args, **kws)
+
+    def _http_handler(self, *args, **kw):
+        return NotImplementedError("Try return a sub-class of PublisherHTTPHandler here")
+
+


Property changes on: zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/connection.py
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/wsgi.py
===================================================================
--- zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/wsgi.py	2011-02-12 17:38:22 UTC (rev 120301)
+++ zope.testbrowser/jinty-webtest3-minimal/src/zope/testbrowser/wsgi.py	2011-02-12 17:41:32 UTC (rev 120302)
@@ -11,93 +11,143 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
-import base64
-import re
-import wsgi_intercept
-import wsgi_intercept.mechanize_intercept
+"""WSGI-specific testing code
+"""
+
+import sys
+
+from webtest import TestApp
+
 import zope.testbrowser.browser
+import zope.testbrowser.connection
 
+class WSGIConnection(object):
+    """A ``mechanize`` compatible connection object."""
 
-# List of hostname where the test browser/http function replies to
-TEST_HOSTS = ['localhost', '127.0.0.1']
+    def __init__(self, test_app, host, timeout=None):
+        self._test_app = TestApp(test_app)
+        self.host = host
 
+    def set_debuglevel(self, level):
+        pass
 
-class InterceptBrowser(wsgi_intercept.mechanize_intercept.Browser):
+    def _quote(self, url):
+        # XXX: is this necessary with WebTest? Was cargeo-culted from the 
+        # Zope Publisher Connection
+        return url.replace(' ', '%20')
 
-    default_schemes = ['http']
-    default_others = ['_http_error',
-                      '_http_default_error']
-    default_features = ['_redirect', '_cookies', '_referer', '_refresh',
-                        '_equiv', '_basicauth', '_digestauth']
+    def request(self, method, url, body=None, headers=None):
+        """Send a request to the publisher.
 
+        The response will be stored in ``self.response``.
+        """
+        if body is None:
+            body = ''
 
-class Browser(zope.testbrowser.browser.Browser):
-    """Override the zope.testbrowser.browser.Browser interface so that it
-    uses InterceptBrowser.
-    """
+        if url == '':
+            url = '/'
 
-    def __init__(self, *args, **kw):
-        kw['mech_browser'] = InterceptBrowser()
-        super(Browser, self).__init__(*args, **kw)
+        url = self._quote(url)
 
+        # Extract the handle_error option header
+        if sys.version_info >= (2,5):
+            handle_errors_key = 'X-Zope-Handle-Errors'
+        else:
+            handle_errors_key = 'X-zope-handle-errors'
+        handle_errors_header = headers.get(handle_errors_key, True)
+        if handle_errors_key in headers:
+            del headers[handle_errors_key]
 
-# Compatibility helpers to behave like zope.app.testing
+        # Translate string to boolean.
+        handle_errors = {'False': False}.get(handle_errors_header, True)
+        extra_environ = {}
+        if not handle_errors:
+            # There doesn't seem to be a "Right Way" to do this
+            extra_environ['wsgi.handleErrors'] = False # zope.app.wsgi does this
+            extra_environ['paste.throw_errors'] = True # the paste way of doing this
 
-basicre = re.compile('Basic (.+)?:(.+)?$')
+        scheme_key = 'X-Zope-Scheme'
+        extra_environ['wsgi.url_scheme'] = headers.get(scheme_key, 'http')
+        if scheme_key in headers:
+            del headers[scheme_key]
 
+        app = self._test_app
 
-def auth_header(header):
-    """This function takes an authorization HTTP header and encode the
-    couple user, password into base 64 like the HTTP protocol wants
-    it.
-    """
-    match = basicre.match(header)
-    if match:
-        u, p = match.group(1, 2)
-        if u is None:
-            u = ''
-        if p is None:
-            p = ''
-        auth = base64.encodestring('%s:%s' % (u, p))
-        return 'Basic %s' % auth[:-1]
-    return header
+        # clear our app cookies so that our testbrowser cookie headers don't
+        # get stomped
+        app.cookies.clear()
 
+        # pass the request to webtest
+        if method == 'GET':
+            assert not body, body
+            response = app.get(url, headers=headers, expect_errors=True, extra_environ=extra_environ)
+        elif method == 'POST':
+            response = app.post(url, body, headers=headers, expect_errors=True, extra_environ=extra_environ)
+        else:
+            raise Exception('Couldnt handle method %s' % method)
 
-def is_wanted_header(header):
-    """Return True if the given HTTP header key is wanted.
-    """
-    key, value = header
-    return key.lower() not in ('x-content-type-warning', 'x-powered-by')
+        self.response = response
 
+    def getresponse(self):
+        """Return a ``mechanize`` compatible response.
 
-class AuthorizationMiddleware(object):
-    """This middleware makes the WSGI application compatible with the
-    HTTPCaller behavior defined in zope.app.testing.functional:
-    - It modifies the HTTP Authorization header to encode user and
-      password into base64 if it is Basic authentication.
-    """
+        The goal of ths method is to convert the WebTest's reseponse to
+        a ``mechanize`` compatible response, which is also understood by
+        mechanize.
+        """
+        response = self.response
+        status = int(response.status[:3])
+        reason = response.status[4:]
 
-    def __init__(self, wsgi_stack):
-        self.wsgi_stack = wsgi_stack
+        headers = response.headers.items()
+        headers.sort()
+        headers.insert(0, ('Status', response.status))
+        headers = '\r\n'.join('%s: %s' % h for h in headers)
+        content = response.body
+        return zope.testbrowser.connection.Response(content, headers, status, reason)
 
-    def __call__(self, environ, start_response):
-        # Handle authorization
-        auth_key = 'HTTP_AUTHORIZATION'
-        if auth_key in environ:
-            environ[auth_key] = auth_header(environ[auth_key])
 
-        # Remove unwanted headers
-        def application_start_response(status, headers, exc_info=None):
-            headers = filter(is_wanted_header, headers)
-            start_response(status, headers)
+class WSGIHTTPHandler(zope.testbrowser.connection.HTTPHandler):
 
-        for entry in self.wsgi_stack(environ, application_start_response):
-            yield entry
+    def __init__(self, test_app, *args, **kw):
+        self._test_app = test_app
+        zope.testbrowser.connection.HTTPHandler.__init__(self, *args, **kw)
 
+    def _connect(self, *args, **kw):
+        return WSGIConnection(self._test_app, *args, **kw)
 
+    def https_request(self, req):
+        req.add_unredirected_header('X-Zope-Scheme', 'https')
+        return self.http_request(req)
+
+
+class WSGIMechanizeBrowser(zope.testbrowser.connection.MechanizeBrowser):
+    """Special ``mechanize`` browser using the WSGI HTTP handler."""
+
+    def __init__(self, test_app, *args, **kw):
+        self._test_app = test_app
+        zope.testbrowser.connection.MechanizeBrowser.__init__(self, *args, **kw)
+
+    def _http_handler(self, *args, **kw):
+        return WSGIHTTPHandler(self._test_app, *args, **kw)
+
+
+class Browser(zope.testbrowser.browser.Browser):
+    """A WSGI `testbrowser` Browser that uses a WebTest wrapped WSGI app."""
+
+    def __init__(self, url=None, wsgi_app=None):
+        if wsgi_app is None:
+            wsgi_app = _APP_UNDER_TEST
+        if wsgi_app is None:
+            raise AssertionError("wsgi_app not provided or zope.testbrowser.wsgi.Layer not setup")
+        mech_browser = WSGIMechanizeBrowser(wsgi_app)
+        super(Browser, self).__init__(url=url, mech_browser=mech_browser)
+
+_APP_UNDER_TEST = None # setup and torn down by the Layer class
+
 class Layer(object):
     """Test layer which sets up WSGI application for use with
-    wsgi_intercept/testbrowser.
+    WebTest/testbrowser.
 
     """
 
@@ -117,13 +167,10 @@
 
     def setUp(self):
         self.cooperative_super('setUp')
-        self.app = self.make_wsgi_app()
-        factory = lambda: AuthorizationMiddleware(self.app)
+        global _APP_UNDER_TEST
+        _APP_UNDER_TEST = self.make_wsgi_app()
 
-        for host in TEST_HOSTS:
-            wsgi_intercept.add_wsgi_intercept(host, 80, factory)
-
     def tearDown(self):
-        for host in TEST_HOSTS:
-            wsgi_intercept.remove_wsgi_intercept(host, 80)
+        global _APP_UNDER_TEST
+        _APP_UNDER_TEST = None
         self.cooperative_super('tearDown')



More information about the checkins mailing list