[Zope3-checkins] SVN: Zope3/branches/srichter-blow-services/src/zope/app/testing/ All test support code is now in zope.app.testing. Code from any tests

Stephan Richter srichter at cosmos.phy.tufts.edu
Tue Dec 21 11:55:28 EST 2004


Log message for revision 28669:
  All test support code is now in zope.app.testing. Code from any tests 
  module or package should *never* be reused.
  

Changed:
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/__init__.py
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/dochttp.py
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/dochttp.txt
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/functional.py
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/placelesssetup.py
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/recorded/
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/setup.py
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/test.py
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/tests.py
  A   Zope3/branches/srichter-blow-services/src/zope/app/testing/ztapi.py

-=-
Added: Zope3/branches/srichter-blow-services/src/zope/app/testing/__init__.py
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/testing/__init__.py	2004-12-21 16:39:25 UTC (rev 28668)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/__init__.py	2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1 @@
+# Make directory a Python package.

Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/dochttp.py (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/dochttp.py)

Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/dochttp.txt (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/dochttp.txt)

Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/functional.py (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/functional.py)
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/tests/functional.py	2004-12-17 21:36:22 UTC (rev 28644)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/functional.py	2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1,634 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Functional testing framework for Zope 3.
+
+There should be a file 'ftesting.zcml' in the current directory.
+
+$Id$
+"""
+import logging
+import re
+import rfc822
+import sys
+import traceback
+import unittest
+import urllib
+
+from StringIO import StringIO
+from Cookie import SimpleCookie
+
+from transaction import abort, commit
+from ZODB.DB import DB
+from ZODB.DemoStorage import DemoStorage
+import zope.interface
+from zope.publisher.browser import BrowserRequest
+from zope.publisher.http import HTTPRequest
+from zope.publisher.publish import publish
+from zope.publisher.xmlrpc import XMLRPCRequest
+from zope.security.interfaces import Forbidden, Unauthorized
+from zope.security.management import endInteraction
+import zope.publisher.interfaces.http
+from zope.testing import doctest
+
+import zope.app.pluggableauth
+import zope.app.testing.setup
+
+from zope.app import zapi
+from zope.app.debug import Debugger
+from zope.app.publication.http import HTTPPublication
+from zope.app.publication.browser import BrowserPublication
+from zope.app.publication.xmlrpc import XMLRPCPublication
+from zope.app.publication.zopepublication import ZopePublication
+from zope.app.publication.http import HTTPPublication
+from zope.publisher.interfaces.browser import IDefaultBrowserLayer
+from zope.publisher.interfaces.browser import IDefaultSkin
+from zope.publisher.interfaces.browser import IBrowserRequest
+from zope.app.component.hooks import setSite, getSite
+
+HTTPTaskStub = StringIO
+
+
+class ResponseWrapper(object):
+    """A wrapper that adds several introspective methods to a response."""
+
+    def __init__(self, response, outstream, path):
+        self._response = response
+        self._outstream = outstream
+        self._path = path
+
+    def getOutput(self):
+        """Returns the full HTTP output (headers + body)"""
+        return self._outstream.getvalue()
+
+    def getBody(self):
+        """Returns the response body"""
+        output = self._outstream.getvalue()
+        idx = output.find('\r\n\r\n')
+        if idx == -1:
+            return None
+        else:
+            return output[idx+4:]
+
+    def getPath(self):
+        """Returns the path of the request"""
+        return self._path
+
+    def __getattr__(self, attr):
+        return getattr(self._response, attr)
+
+
+def _getDefaultSkin():
+    """Returns the current default skin as an interface."""
+    adapters = zapi.getService(zapi.servicenames.Adapters)
+    skin = adapters.lookup((IBrowserRequest,), IDefaultSkin, '')
+    return skin or IDefaultBrowserLayer
+
+
+grant_request = (r"""
+POST /@@grant.html HTTP/1.1
+Authorization: Basic Z2xvYmFsbWdyOmdsb2JhbG1ncnB3
+Content-Length: 5796
+Content-Type: application/x-www-form-urlencoded
+
+field.principal=em9wZS5tZ3I_"""
+"""&field.principal.displayed=y"""
+"""&GRANT_SUBMIT=Change"""
+"""&field.em9wZS5tZ3I_.role.zope.Manager=allow"""
+"""&field.em9wZS5tZ3I_.role.zope.Manager-empty-marker=1""")
+
+class FunctionalTestSetup(object):
+    """Keeps shared state across several functional test cases."""
+
+    __shared_state = { '_init': False }
+
+    def __init__(self, config_file=None):
+        """Initializes Zope 3 framework.
+
+        Creates a volatile memory storage.  Parses Zope3 configuration files.
+        """
+        self.__dict__ = self.__shared_state
+
+        if not self._init:
+
+            # Make sure unit tests are cleaned up
+            zope.app.tests.setup.placefulSetUp()
+            zope.app.tests.setup.placefulTearDown()
+
+            if not config_file:
+                config_file = 'ftesting.zcml'
+            self.log = StringIO()
+            # Make it silent but keep the log available for debugging
+            logging.root.addHandler(logging.StreamHandler(self.log))
+            self.base_storage = DemoStorage("Memory Storage")
+            self.db = DB(self.base_storage)
+            self.app = Debugger(self.db, config_file)
+            self.connection = None
+            self._config_file = config_file
+            self._init = True
+
+            # Make a local grant for the test user
+            # TODO, find a better way to make this grant happen.
+            # The way I did this is way too messy, given how
+            # strang FunctionalTestSetup is.  Later, when we
+            # have time, we should clean up this (perhaps with an
+            # event) and clean up FunctionalTestSetup.
+            response = http(grant_request, handle_errors=False)
+            FunctionalTestSetup().connection = None
+            
+        elif config_file and config_file != self._config_file:
+            # Running different tests with different configurations is not
+            # supported at the moment
+            raise NotImplementedError('Already configured'
+                                      ' with a different config file')
+
+    def setUp(self):
+        """Prepares for a functional test case."""
+        # Tear down the old demo storage (if any) and create a fresh one
+        abort()
+        self.db.close()
+        storage = DemoStorage("Demo Storage", self.base_storage)
+        self.db = self.app.db = DB(storage)
+        self.connection = None
+
+    def tearDown(self):
+        """Cleans up after a functional test case."""
+        abort()
+        if self.connection:
+            self.connection.close()
+            self.connection = None
+        self.db.close()
+
+    def getRootFolder(self):
+        """Returns the Zope root folder."""
+        if not self.connection:
+            self.connection = self.db.open()
+        root = self.connection.root()
+        return root[ZopePublication.root_name]
+
+    def getApplication(self):
+        """Returns the Zope application instance."""
+        return self.app
+
+
+class FunctionalTestCase(unittest.TestCase):
+    """Functional test case."""
+
+    def setUp(self):
+        """Prepares for a functional test case."""
+        super(FunctionalTestCase, self).setUp()
+        FunctionalTestSetup().setUp()
+
+    def tearDown(self):
+        """Cleans up after a functional test case."""
+
+        FunctionalTestSetup().tearDown()
+        super(FunctionalTestCase, self).tearDown()
+
+    def getRootFolder(self):
+        """Returns the Zope root folder."""
+        return FunctionalTestSetup().getRootFolder()
+
+    def commit(self):
+        commit()
+
+    def abort(self):
+        abort()
+
+class BrowserTestCase(FunctionalTestCase):
+    """Functional test case for Browser requests."""
+
+    def setUp(self):
+        super(BrowserTestCase, self).setUp()
+        # Somewhere to store cookies between consecutive requests
+        self.cookies = SimpleCookie()
+
+    def tearDown(self):
+        del self.cookies
+
+        self.setSite(None)
+        super(BrowserTestCase, self).tearDown()
+
+    def setSite(self, site):
+        """Set the site which will be used to look up local services"""
+        setSite(site)
+
+    def getSite(self):
+        """Returns the site which is used to look up local services"""
+        return getSite()
+
+    def makeRequest(self, path='', basic=None, form=None, env={},
+                    outstream=None):
+        """Creates a new request object.
+
+        Arguments:
+          path   -- the path to be traversed (e.g. "/folder1/index.html")
+          basic  -- basic HTTP authentication credentials ("user:password")
+          form   -- a dictionary emulating a form submission
+                    (Note that field values should be Unicode strings)
+          env    -- a dictionary of additional environment variables
+                    (You can emulate HTTP request header
+                       X-Header: foo
+                     by adding 'HTTP_X_HEADER': 'foo' to env)
+          outstream -- a stream where the HTTP response will be written
+        """
+        if outstream is None:
+            outstream = HTTPTaskStub()
+        environment = {"HTTP_HOST": 'localhost',
+                       "HTTP_REFERER": 'localhost',
+                       "HTTP_COOKIE": self.__http_cookie(path)}
+        environment.update(env)
+        app = FunctionalTestSetup().getApplication()
+        request = app._request(path, '', outstream,
+                               environment=environment,
+                               basic=basic, form=form,
+                               request=BrowserRequest)
+        zope.interface.directlyProvides(request, _getDefaultSkin())
+        return request
+
+    def __http_cookie(self, path):
+        '''Return self.cookies as an HTTP_COOKIE environment format string'''
+        l = [m.OutputString() for m in self.cookies.values()
+                if path.startswith(m['path'])]
+        return '; '.join(l)
+
+    def publish(self, path, basic=None, form=None, env={},
+                handle_errors=False):
+        """Renders an object at a given location.
+
+        Arguments are the same as in makeRequest with the following exception:
+          handle_errors  -- if False (default), exceptions will not be caught
+                            if True, exceptions will return a formatted error
+                            page.
+
+        Returns the response object enhanced with the following methods:
+          getOutput()    -- returns the full HTTP output as a string
+          getBody()      -- returns the full response body as a string
+          getPath()      -- returns the path used in the request
+        """
+        outstream = HTTPTaskStub()
+        old_site = self.getSite()
+        self.setSite(None)
+        # A cookie header has been sent - ensure that future requests
+        # in this test also send the cookie, as this is what browsers do.
+        # We pull it apart and reassemble the header to block cookies
+        # with invalid paths going through, which may or may not be correct
+        if env.has_key('HTTP_COOKIE'):
+            self.cookies.load(env['HTTP_COOKIE'])
+            del env['HTTP_COOKIE'] # Added again in makeRequest
+
+        request = self.makeRequest(path, basic=basic, form=form, env=env,
+                                   outstream=outstream)
+        response = ResponseWrapper(request.response, outstream, path)
+        if env.has_key('HTTP_COOKIE'):
+            self.cookies.load(env['HTTP_COOKIE'])
+        publish(request, handle_errors=handle_errors)
+        # Urgh - need to play with the response's privates to extract
+        # cookies that have been set
+        for k,v in response._cookies.items():
+            k = k.encode('utf8')
+            self.cookies[k] = v['value'].encode('utf8')
+            if self.cookies[k].has_key('Path'):
+                self.cookies[k]['Path'] = v['Path']
+        self.setSite(old_site)
+        return response
+
+    def checkForBrokenLinks(self, body, path, basic=None):
+        """Looks for broken links in a page by trying to traverse relative
+        URIs.
+        """
+        if not body: return
+
+        old_site = self.getSite()
+        self.setSite(None)
+
+        from htmllib import HTMLParser
+        from formatter import NullFormatter
+        class SimpleHTMLParser(HTMLParser):
+            def __init__(self, fmt, base):
+                HTMLParser.__init__(self, fmt)
+                self.base = base
+            def do_base(self, attrs):
+                self.base = dict(attrs).get('href', self.base)
+
+        parser = SimpleHTMLParser(NullFormatter(), path)
+        parser.feed(body)
+        parser.close()
+        base = parser.base
+        while not base.endswith('/'):
+            base = base[:-1]
+        if base.startswith('http://localhost/'):
+            base = base[len('http://localhost/') - 1:]
+
+        errors = []
+        for a in parser.anchorlist:
+            if a.startswith('http://localhost/'):
+                a = a[len('http://localhost/') - 1:]
+            elif a.find(':') != -1:
+                # Assume it is an external link
+                continue
+            elif not a.startswith('/'):
+                a = base + a
+            if a.find('#') != -1:
+                a = a[:a.index('#') - 1]
+            # XXX what about queries (/path/to/foo?bar=baz&etc)?
+            request = None
+            try:
+                try:
+                    request = self.makeRequest(a, basic=basic)
+                    publication = request.publication
+                    request.processInputs()
+                    publication.beforeTraversal(request)
+                    object = publication.getApplication(request)
+                    object = request.traverse(object)
+                    publication.afterTraversal(request, object)
+                except (KeyError, NameError, AttributeError, Unauthorized, Forbidden):
+                    e = traceback.format_exception_only(*sys.exc_info()[:2])[-1]
+                    errors.append((a, e.strip()))
+            finally:
+                publication.endRequest(request, object)
+                self.setSite(old_site)
+                # Bad Things(TM) related to garbage collection and special
+                # __del__ methods happen if request.close() is not called here
+                if request:
+                    request.close()
+        if errors:
+            self.fail("%s contains broken links:\n" % path
+                      + "\n".join(["  %s:\t%s" % (a, e) for a, e in errors]))
+
+
+class HTTPTestCase(FunctionalTestCase):
+    """Functional test case for HTTP requests."""
+
+    def makeRequest(self, path='', basic=None, form=None, env={},
+                    instream=None, outstream=None):
+        """Creates a new request object.
+
+        Arguments:
+          path   -- the path to be traversed (e.g. "/folder1/index.html")
+          basic  -- basic HTTP authentication credentials ("user:password")
+          form   -- a dictionary emulating a form submission
+                    (Note that field values should be Unicode strings)
+          env    -- a dictionary of additional environment variables
+                    (You can emulate HTTP request header
+                       X-Header: foo
+                     by adding 'HTTP_X_HEADER': 'foo' to env)
+          instream  -- a stream from where the HTTP request will be read
+          outstream -- a stream where the HTTP response will be written
+        """
+        if outstream is None:
+            outstream = HTTPTaskStub()
+        if instream is None:
+            instream = ''
+        environment = {"HTTP_HOST": 'localhost',
+                       "HTTP_REFERER": 'localhost'}
+        environment.update(env)
+        app = FunctionalTestSetup().getApplication()
+        request = app._request(path, instream, outstream,
+                               environment=environment,
+                               basic=basic, form=form,
+                               request=HTTPRequest, publication=HTTPPublication)
+        return request
+
+    def publish(self, path, basic=None, form=None, env={},
+                handle_errors=False, request_body=''):
+        """Renders an object at a given location.
+
+        Arguments are the same as in makeRequest with the following exception:
+          handle_errors  -- if False (default), exceptions will not be caught
+                            if True, exceptions will return a formatted error
+                            page.
+
+        Returns the response object enhanced with the following methods:
+          getOutput()    -- returns the full HTTP output as a string
+          getBody()      -- returns the full response body as a string
+          getPath()      -- returns the path used in the request
+        """
+        outstream = HTTPTaskStub()
+        request = self.makeRequest(path, basic=basic, form=form, env=env,
+                                   instream=request_body, outstream=outstream)
+        response = ResponseWrapper(request.response, outstream, path)
+        publish(request, handle_errors=handle_errors)
+        return response
+
+
+class HTTPHeaderOutput:
+
+    zope.interface.implements(zope.publisher.interfaces.http.IHeaderOutput)
+
+    def __init__(self, protocol, omit):
+        self.headers = {}
+        self.headersl = []
+        self.protocol = protocol
+        self.omit = omit
+    
+    def setResponseStatus(self, status, reason):
+        self.status, self.reason = status, reason
+
+    def setResponseHeaders(self, mapping):
+        self.headers.update(dict(
+            [('-'.join([s.capitalize() for s in name.split('-')]), v)
+             for name, v in mapping.items()
+             if name.lower() not in self.omit]
+        ))
+
+    def appendResponseHeaders(self, lst):
+        headers = [split_header(header) for header in lst]
+        self.headersl.extend(
+            [('-'.join([s.capitalize() for s in name.split('-')]), v)
+             for name, v in headers
+             if name.lower() not in self.omit]
+        )
+
+    def __str__(self):
+        out = ["%s: %s" % header for header in self.headers.items()]
+        out.extend(["%s: %s" % header for header in self.headersl])
+        out.sort()
+        out.insert(0, "%s %s %s" % (self.protocol, self.status, self.reason))
+        return '\n'.join(out)
+
+class DocResponseWrapper(ResponseWrapper):
+    """Response Wrapper for use in doc tests
+    """
+
+    def __init__(self, response, outstream, path, header_output):
+        ResponseWrapper.__init__(self, response, outstream, path)
+        self.header_output = header_output
+
+    def __str__(self):
+        body = self.getOutput()
+        if body:
+            return "%s\n\n%s" % (self.header_output, body)
+        return "%s\n" % (self.header_output)
+
+    def getBody(self):
+        return self.getOutput()
+
+def http(request_string, handle_errors=True):
+    """Execute an HTTP request string via the publisher
+
+    This is used for HTTP doc tests.
+    """
+    # Commit work done by previous python code.
+    commit()
+
+    # Discard leading white space to make call layout simpler
+    request_string = request_string.lstrip()
+
+    # split off and parse the command line
+    l = request_string.find('\n')
+    command_line = request_string[:l].rstrip()
+    request_string = request_string[l+1:]
+    method, path, protocol = command_line.split()
+    path = urllib.unquote(path)
+    
+
+    instream = StringIO(request_string)
+    environment = {"HTTP_HOST": 'localhost',
+                   "HTTP_REFERER": 'localhost',
+                   "REQUEST_METHOD": method,
+                   "SERVER_PROTOCOL": protocol,
+                   }
+
+    headers = [split_header(header)
+               for header in rfc822.Message(instream).headers]
+    for name, value in headers:
+        name = ('_'.join(name.upper().split('-')))
+        if name not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
+            name = 'HTTP_' + name
+        environment[name] = value.rstrip()
+
+    auth_key = 'HTTP_AUTHORIZATION'
+    if environment.has_key(auth_key):
+        environment[auth_key] = auth_header(environment[auth_key])
+
+    outstream = HTTPTaskStub()
+
+
+    old_site = getSite()
+    setSite(None)
+    app = FunctionalTestSetup().getApplication()
+    header_output = HTTPHeaderOutput(
+        protocol, ('x-content-type-warning', 'x-powered-by'))
+
+    if method in ('GET', 'POST', 'HEAD'):
+        if (method == 'POST' and
+            environment.get('CONTENT_TYPE', '').startswith('text/xml')
+            ):
+            request_cls = XMLRPCRequest
+            publication_cls = XMLRPCPublication
+        else:
+            request_cls = type(BrowserRequest.__name__, (BrowserRequest,), {})
+            zope.interface.classImplements(request_cls, _getDefaultSkin())
+            publication_cls = BrowserPublication
+    else:
+        request_cls = HTTPRequest
+        publication_cls = HTTPPublication
+    
+    request = app._request(path, instream, outstream,
+                           environment=environment,
+                           request=request_cls, publication=publication_cls)
+    request.response.setHeaderOutput(header_output)
+    response = DocResponseWrapper(request.response, outstream, path,
+                                  header_output)
+    
+    publish(request, handle_errors=handle_errors)
+    setSite(old_site)
+
+    # sync Python connection:
+    getRootFolder()._p_jar.sync()
+    
+    return response
+
+headerre = re.compile('(\S+): (.+)$')
+def split_header(header):
+    return headerre.match(header).group(1, 2)
+
+basicre = re.compile('Basic (.+)?:(.+)?$')
+def auth_header(header):
+    match = basicre.match(header)
+    if match:
+        import base64
+        u, p = match.group(1, 2)
+        if u is None:
+            u = ''
+        if p is None:
+            p = ''
+        auth = base64.encodestring('%s:%s' % (u, p))
+        return 'Basic %s' % auth[:-1]
+    return header
+
+def getRootFolder():
+    return FunctionalTestSetup().getRootFolder()
+
+def sync():
+    getRootFolder()._p_jar.sync()
+
+#
+# Sample functional test case
+#
+
+class SampleFunctionalTest(BrowserTestCase):
+
+    def testRootPage(self):
+        response = self.publish('/')
+        self.assertEquals(response.getStatus(), 200)
+
+    def testRootPage_preferred_languages(self):
+        response = self.publish('/', env={'HTTP_ACCEPT_LANGUAGE': 'en'})
+        self.assertEquals(response.getStatus(), 200)
+
+    def testNotExisting(self):
+        response = self.publish('/nosuchthing', handle_errors=True)
+        self.assertEquals(response.getStatus(), 404)
+
+    def testLinks(self):
+        response = self.publish('/')
+        self.assertEquals(response.getStatus(), 200)
+        self.checkForBrokenLinks(response.getBody(), response.getPath())
+
+
+def sample_test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(SampleFunctionalTest))
+    return suite
+
+def FunctionalDocFileSuite(*paths, **kw):
+    globs = kw.setdefault('globs', {})
+    globs['http'] = http
+    globs['getRootFolder'] = getRootFolder
+    globs['sync'] = sync
+
+    kw['package'] = doctest._normalize_module(kw.get('package'))
+
+    kwsetUp = kw.get('setUp')
+    def setUp(test):
+        FunctionalTestSetup().setUp()
+        
+        if kwsetUp is not None:
+            kwsetUp(test)
+    kw['setUp'] = setUp
+
+    kwtearDown = kw.get('tearDown')
+    def tearDown(test):
+        if kwtearDown is not None:
+            kwtearDown(test)
+        FunctionalTestSetup().tearDown()
+    kw['tearDown'] = tearDown
+
+    kw['optionflags'] = doctest.ELLIPSIS | doctest.REPORT_NDIFF
+
+    return doctest.DocFileSuite(*paths, **kw)
+
+if __name__ == '__main__':
+    unittest.main()

Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/placelesssetup.py (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/placelesssetup.py)
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/tests/placelesssetup.py	2004-12-17 21:36:22 UTC (rev 28644)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/placelesssetup.py	2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1,69 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Unit test logic for setting up and tearing down basic infrastructure
+
+$Id$
+"""
+from zope.app.testing import ztapi
+from zope.schema.vocabulary import setVocabularyRegistry
+from zope.component.tests.placelesssetup \
+    import PlacelessSetup as CAPlacelessSetup
+from zope.app.event.tests.placelesssetup \
+    import PlacelessSetup as EventPlacelessSetup
+from zope.app.i18n.tests.placelesssetup \
+    import PlacelessSetup as I18nPlacelessSetup
+from zope.app.container.tests.placelesssetup \
+    import PlacelessSetup as ContainerPlacelessSetup
+from zope.app.security._protections import protect
+from zope.app.traversing.browser.interfaces import IAbsoluteURL
+from zope.app.traversing.browser.absoluteurl import AbsoluteURL
+
+class PlacelessSetup(CAPlacelessSetup,
+                     EventPlacelessSetup,
+                     I18nPlacelessSetup,
+                     ContainerPlacelessSetup
+                     ):
+
+    def setUp(self, doctesttest=None):
+        CAPlacelessSetup.setUp(self)
+        ContainerPlacelessSetup.setUp(self)
+        EventPlacelessSetup.setUp(self)
+        I18nPlacelessSetup.setUp(self)
+        # Register app-specific security declarations
+        protect()
+
+        ztapi.browserView(None, 'absolute_url', AbsoluteURL)
+        ztapi.browserViewProviding(None, AbsoluteURL, IAbsoluteURL)
+
+        from zope.app.security.tests import addCheckerPublic
+        addCheckerPublic()
+
+        from zope.security.management import newInteraction
+        newInteraction()
+
+        setVocabularyRegistry(None)
+
+
+ps = PlacelessSetup()
+setUp = ps.setUp
+
+def tearDown():
+    tearDown_ = ps.tearDown
+    def tearDown(doctesttest=None):
+        tearDown_()
+    return tearDown
+
+tearDown = tearDown()
+
+del ps

Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/recorded (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/recorded)

Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/setup.py (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/setup.py)
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/tests/setup.py	2004-12-17 21:36:22 UTC (rev 28644)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/setup.py	2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1,207 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Setting up an environment for testing context-dependent objects
+
+$Id$
+"""
+
+import zope.component
+import zope.interface
+from zope.app import zapi
+from zope.app.testing import ztapi
+from zope.interface import classImplements
+
+#------------------------------------------------------------------------
+# Annotations
+from zope.app.annotation.attribute import AttributeAnnotations
+from zope.app.annotation.interfaces import IAnnotations
+from zope.app.annotation.interfaces import IAttributeAnnotatable
+def setUpAnnotations():
+    ztapi.provideAdapter(IAttributeAnnotatable, IAnnotations,
+                         AttributeAnnotations)
+
+#------------------------------------------------------------------------
+# Dependencies
+from zope.app.dependable import Dependable
+from zope.app.dependable.interfaces import IDependable
+def setUpDependable():
+    ztapi.provideAdapter(IAttributeAnnotatable, IDependable,
+                         Dependable)
+
+#------------------------------------------------------------------------
+# Traversal
+from zope.app.traversing.browser.interfaces import IAbsoluteURL
+from zope.app.container.traversal import ContainerTraversable
+from zope.app.container.interfaces import ISimpleReadContainer
+from zope.app.traversing.interfaces import IContainmentRoot
+from zope.app.traversing.interfaces import IPhysicallyLocatable
+from zope.app.traversing.interfaces import ITraverser, ITraversable
+from zope.app.traversing.adapters import DefaultTraversable
+from zope.app.traversing.adapters import Traverser, RootPhysicallyLocatable
+from zope.app.location.traversing import LocationPhysicallyLocatable
+from zope.app.traversing.namespace import etc
+
+def setUpTraversal():
+    from zope.app.traversing.browser import SiteAbsoluteURL, AbsoluteURL
+
+    ztapi.provideAdapter(None, ITraverser, Traverser)
+    ztapi.provideAdapter(None, ITraversable, DefaultTraversable)
+
+    ztapi.provideAdapter(
+        ISimpleReadContainer, ITraversable, ContainerTraversable)
+    ztapi.provideAdapter(
+        None, IPhysicallyLocatable, LocationPhysicallyLocatable)
+    ztapi.provideAdapter(
+        IContainmentRoot, IPhysicallyLocatable, RootPhysicallyLocatable)
+
+    # set up etc namespace
+    ztapi.provideAdapter(None, ITraversable, etc, name="etc")
+    ztapi.provideView(None, None, ITraversable, "etc", etc)
+
+    ztapi.browserView(None, "absolute_url", AbsoluteURL)
+    ztapi.browserView(IContainmentRoot, "absolute_url", SiteAbsoluteURL)
+
+    ztapi.browserView(None, '', AbsoluteURL, providing=IAbsoluteURL)
+    ztapi.browserView(IContainmentRoot, '', SiteAbsoluteURL,
+                      providing=IAbsoluteURL)
+
+
+#------------------------------------------------------------------------
+# Use registration
+from zope.app.registration.interfaces import IAttributeRegisterable
+from zope.app.registration.interfaces import IRegistered
+from zope.app.registration.registration import Registered
+def setUpRegistered():
+    ztapi.provideAdapter(IAttributeRegisterable, IRegistered,
+                         Registered)
+
+#------------------------------------------------------------------------
+# Service service lookup
+from zope.app.component.localservice import serviceServiceAdapter
+from zope.app.registration.interfaces import IRegistrationActivatedEvent
+from zope.app.registration.interfaces import IRegistrationDeactivatedEvent
+from zope.app.site.service import handleActivated, handleDeactivated
+from zope.component.interfaces import IServiceService
+from zope.interface import Interface
+def setUpServiceService():
+    ztapi.subscribe((IRegistrationActivatedEvent,), None, handleActivated)
+    ztapi.subscribe((IRegistrationDeactivatedEvent,), None, handleDeactivated)
+    ztapi.provideAdapter(Interface, IServiceService, serviceServiceAdapter)
+
+#------------------------------------------------------------------------
+# Placeful setup
+import zope.app.component.hooks
+from zope.app.testing.placelesssetup import setUp as placelessSetUp
+from zope.app.testing.placelesssetup import tearDown as placelessTearDown
+def placefulSetUp(site=False):
+    placelessSetUp()
+    zope.app.component.hooks.setHooks()
+    setUpAnnotations()
+    setUpDependable()
+    setUpTraversal()
+    setUpRegistered()
+    setUpServiceService()
+
+    if site:
+        site = rootFolder()
+        createServiceManager(site, setsite=True)
+        return site
+
+from zope.app.component.hooks import setSite
+def placefulTearDown():
+    placelessTearDown()
+    zope.app.component.hooks.resetHooks()
+    setSite()
+
+
+from zope.app.folder import Folder, rootFolder
+
+def buildSampleFolderTree():
+    # set up a reasonably complex folder structure
+    #
+    #     ____________ rootFolder ____________
+    #    /                                    \
+    # folder1 __________________            folder2
+    #   |                       \             |
+    # folder1_1 ____           folder1_2    folder2_1
+    #   |           \            |            |
+    # folder1_1_1 folder1_1_2  folder1_2_1  folder2_1_1
+
+    root = rootFolder()
+    root['folder1'] = Folder()
+    root['folder1']['folder1_1'] = Folder()
+    root['folder1']['folder1_1']['folder1_1_1'] = Folder()
+    root['folder1']['folder1_1']['folder1_1_2'] = Folder()
+    root['folder1']['folder1_2'] = Folder()
+    root['folder1']['folder1_2']['folder1_2_1'] = Folder()
+    root['folder2'] = Folder()
+    root['folder2']['folder2_1'] = Folder()
+    root['folder2']['folder2_1']['folder2_1_1'] = Folder()
+
+    return root
+
+
+from zope.app.site.service import ServiceManager
+from zope.app.site.interfaces import ISite
+def createServiceManager(folder, setsite=False):
+    if not ISite.providedBy(folder):
+        folder.setSiteManager(ServiceManager(folder))
+    if setsite:
+        setSite(folder)
+    return zapi.traverse(folder, "++etc++site")
+
+from zope.app.site.service import ServiceRegistration
+from zope.app.site.interfaces import ISimpleService
+from zope.app.registration.interfaces import ActiveStatus
+
+def addService(servicemanager, name, service, suffix=''):
+    """Add a service to a service manager
+
+    This utility is useful for tests that need to set up services.
+    """
+    # Most local services implement ISimpleService in ZCML; therefore make
+    # sure we got it here as well.
+    zope.interface.directlyProvides(service, ISimpleService)
+
+    default = zapi.traverse(servicemanager, 'default')
+    default[name+suffix] = service
+    registration = ServiceRegistration(name, service, default)
+    key = default.getRegistrationManager().addRegistration(registration)
+    zapi.traverse(default.getRegistrationManager(), key).status = ActiveStatus
+    return default[name+suffix]
+
+from zope.app.utility import UtilityRegistration
+
+def addUtility(servicemanager, name, iface, utility, suffix=''):
+    """Add a utility to a service manager
+
+    This utility is useful for tests that need to set up utilities.
+    """
+    
+    folder_name = (name or (iface.__name__ + 'Utility')) + suffix
+    default = zapi.traverse(servicemanager, 'default')
+    default[folder_name] = utility
+    registration = UtilityRegistration(name, iface, default[folder_name])
+    key = default.getRegistrationManager().addRegistration(registration)
+    zapi.traverse(default.getRegistrationManager(), key).status = ActiveStatus
+    return default[folder_name]
+
+def createStandardServices(folder):
+    '''Create a bunch of standard placeful services
+
+    Well, uh, 0
+    '''
+    sm = createServiceManager(folder)
+
+

Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/test.py (from rev 28645, Zope3/branches/srichter-blow-services/src/zope/app/tests/test.py)

Added: Zope3/branches/srichter-blow-services/src/zope/app/testing/tests.py
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/testing/tests.py	2004-12-21 16:39:25 UTC (rev 28668)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/tests.py	2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1,205 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test tcpdoc
+
+$Id$
+"""
+import os
+import unittest
+import StringIO
+
+from zope.testing.doctestunit import DocTestSuite
+
+import zope.app.testing
+from zope.app.testing import functional
+from zope.app.testing.dochttp import dochttp
+
+HEADERS = """\
+HTTP/1.1 200 Ok
+Content-Type: text/plain
+"""
+
+BODY = """\
+This is the response body.
+"""
+
+directory = os.path.join(os.path.split(zope.app.testing.__file__)[0],
+                         'recorded')
+
+expected = r'''
+
+  >>> print http(r"""
+  ... GET /@@contents.html HTTP/1.1
+  ... """)
+  HTTP/1.1 401 Unauthorized
+  Content-Length: 89
+  Content-Type: text/html;charset=utf-8
+  Www-Authenticate: basic realm=zope
+  <BLANKLINE>
+  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+        lang="en">
+  <BLANKLINE>
+  ...
+  <BLANKLINE>
+  </html>
+  <BLANKLINE>
+  <BLANKLINE>
+
+
+  >>> print http(r"""
+  ... GET /@@contents.html HTTP/1.1
+  ... Authorization: Basic bWdyOm1ncnB3
+  ... """)
+  HTTP/1.1 200 Ok
+  Content-Length: 89
+  Content-Type: text/html;charset=utf-8
+  <BLANKLINE>
+  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+        lang="en">
+  <BLANKLINE>
+  ...
+  <BLANKLINE>
+  </html>
+  <BLANKLINE>
+  <BLANKLINE>
+
+
+  >>> print http(r"""
+  ... GET /++etc++site/@@manage HTTP/1.1
+  ... Authorization: Basic bWdyOm1ncnB3
+  ... Referer: http://localhost:8081/
+  ... """)
+  HTTP/1.1 303 See Other
+  Content-Length: 0
+  Content-Type: text/plain;charset=utf-8
+  Location: @@tasks.html
+  <BLANKLINE>
+
+
+  >>> print http(r"""
+  ... GET / HTTP/1.1
+  ... Authorization: Basic bWdyOm1ncnB3
+  ... """)
+  HTTP/1.1 200 Ok
+  Content-Length: 89
+  Content-Type: text/html;charset=utf-8
+  <BLANKLINE>
+  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+        lang="en">
+  <BLANKLINE>
+  ...
+  <BLANKLINE>
+  </html>
+  <BLANKLINE>
+  <BLANKLINE>
+
+
+  >>> print http(r"""
+  ... GET /++etc++site/@@tasks.html HTTP/1.1
+  ... Authorization: Basic bWdyOm1ncnB3
+  ... Referer: http://localhost:8081/
+  ... """)
+  HTTP/1.1 200 Ok
+  Content-Length: 89
+  Content-Type: text/html;charset=utf-8
+  <BLANKLINE>
+  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+        lang="en">
+  <BLANKLINE>
+  ...
+  <BLANKLINE>
+  </html>
+  <BLANKLINE>
+  <BLANKLINE>
+'''
+      
+class FunctionalHTTPDocTest(unittest.TestCase):
+
+    def test_dochttp(self):
+        import sys, StringIO
+        old = sys.stdout
+        sys.stdout = StringIO.StringIO()
+        dochttp(['-p', 'test', directory])
+        got = sys.stdout.getvalue()
+        sys.stdout = old
+        self.assert_(got == expected)
+
+
+class DocResponseWrapperTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.body_output = StringIO.StringIO()
+        self.path = "/foo/bar/"
+        self.response = object()
+
+        self.wrapper = functional.DocResponseWrapper(
+            self.response, self.body_output, self.path, HEADERS)
+
+    def test__str__(self):
+        self.assertEqual(str(self.wrapper),
+                         HEADERS + "\n")
+        self.body_output.write(BODY)
+        self.assertEqual(str(self.wrapper),
+                         "%s\n\n%s" % (HEADERS, BODY))
+
+    def test_getBody(self):
+        self.assertEqual(self.wrapper.getBody(), "")
+        self.body_output.write(BODY)
+        self.assertEqual(self.wrapper.getBody(), BODY)
+
+    def test_getOutput(self):
+        self.assertEqual(self.wrapper.getOutput(), "")
+        self.body_output.write(BODY)
+        self.assertEqual(self.wrapper.getOutput(), BODY)
+
+
+class AuthHeaderTestCase(unittest.TestCase):
+
+    def test_auth_encoded(self):
+        auth_header = functional.auth_header
+        header = 'Basic Z2xvYmFsbWdyOmdsb2JhbG1ncnB3'
+        self.assertEquals(auth_header(header), header)
+
+    def test_auth_non_encoded(self):
+        auth_header = functional.auth_header
+        header = 'Basic globalmgr:globalmgrpw'
+        expected = 'Basic Z2xvYmFsbWdyOmdsb2JhbG1ncnB3'
+        self.assertEquals(auth_header(header), expected)
+
+    def test_auth_non_encoded_empty(self):
+        auth_header = functional.auth_header
+        header = 'Basic globalmgr:'
+        expected = 'Basic Z2xvYmFsbWdyOg=='
+        self.assertEquals(auth_header(header), expected)
+        header = 'Basic :pass'
+        expected = 'Basic OnBhc3M='
+        self.assertEquals(auth_header(header), expected)
+
+    def test_auth_non_encoded_colon(self):
+        auth_header = zope.app.testing.functional.auth_header
+        header = 'Basic globalmgr:pass:pass'
+        expected = 'Basic Z2xvYmFsbWdyOnBhc3M6cGFzcw=='
+        self.assertEquals(auth_header(header), expected)
+
+
+def test_suite():
+    return unittest.TestSuite((
+        unittest.makeSuite(FunctionalHTTPDocTest),
+        unittest.makeSuite(DocResponseWrapperTestCase),
+        unittest.makeSuite(AuthHeaderTestCase)
+        ))
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')
+

Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/ztapi.py (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/ztapi.py)
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/tests/ztapi.py	2004-12-17 21:36:22 UTC (rev 28644)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/ztapi.py	2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1,101 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Testing helper functions
+
+$Id$
+"""
+import zope.interface
+from zope.component.interfaces import IDefaultViewName
+from zope.publisher.browser import IBrowserRequest
+from zope.publisher.interfaces.browser import IDefaultBrowserLayer
+from zope.app import zapi
+from zope.app.traversing.interfaces import ITraversable
+
+def provideView(for_, type, providing, name, factory, layer=None):
+    if layer is None:
+        layer = type
+    provideAdapter(for_, providing, factory, name, (layer,))    
+
+def provideMultiView(for_, type, providing, name, factory, layer=None):
+    if layer is None:
+        layer = type
+    provideAdapter(for_[0], providing, factory, name, tuple(for_[1:])+(layer,))
+
+def browserView(for_, name, factory, layer=IDefaultBrowserLayer,
+                providing=zope.interface.Interface):
+    """Define a global browser view
+    """
+    if isinstance(factory, (list, tuple)):
+        raise ValueError("Factory cannot be a list or tuple")
+    provideAdapter(for_, providing, factory, name, (layer,))
+
+def browserViewProviding(for_, factory, providing, layer=IDefaultBrowserLayer):
+    """Define a view providing a particular interface."""
+    if isinstance(factory, (list, tuple)):
+        raise ValueError("Factory cannot be a list or tuple")
+    return browserView(for_, '', factory, layer, providing)
+
+def browserResource(name, factory, layer=IDefaultBrowserLayer,
+                    providing=zope.interface.Interface):
+    """Define a global browser view
+    """
+    if isinstance(factory, (list, tuple)):
+        raise ValueError("Factory cannot be a list or tuple")
+    provideAdapter((layer,), providing, factory, name)
+
+def setDefaultViewName(for_, name, layer=IDefaultBrowserLayer,
+                       type=IBrowserRequest):
+    if layer is None:
+        layer = type
+    gsm = zapi.getGlobalSiteManager()
+    gsm.provideAdapter((for_, layer), IDefaultViewName, '', name)
+
+stypes = list, tuple
+def provideAdapter(required, provided, factory, name='', with=()):
+    if isinstance(factory, (list, tuple)):
+        raise ValueError("Factory cannot be a list or tuple")
+    gsm = zapi.getGlobalSiteManager()
+
+    if with:
+        required = (required, ) + tuple(with)
+    elif not isinstance(required, stypes):
+        required = (required,)
+
+    gsm.provideAdapter(required, provided, name, factory)
+
+def subscribe(required, provided, factory):
+    gsm = zapi.getGlobalSiteManager()
+    gsm.subscribe(required, provided, factory)
+
+def handle(required, handler):
+    subscribe(required, None, handler)
+
+def provideUtility(provided, component, name=''):
+    gsm = zapi.getGlobalSiteManager()
+    gsm.provideUtility(provided, component, name)
+
+def unprovideUtility(provided, name=''):
+    gsm = zapi.getGlobalSiteManager()
+    gsm.provideAdapter((), provided, name, None)
+
+def provideNamespaceHandler(name, handler):
+    provideAdapter(None, ITraversable, handler, name=name)
+    provideView(None, None, ITraversable, name, handler)
+
+def provideService(name, service, interface=None):
+    services = zapi.getGlobalServices()
+    if interface is not None:
+        services.defineService(name, interface)
+    services.provideService(name, service)
+    



More information about the Zope3-Checkins mailing list