[Checkins] SVN: zope.app.testing/tags/3.4.3/ Prepare for release

Marius Gedminas marius at pov.lt
Fri Jul 25 13:24:03 EDT 2008


Log message for revision 88824:
  Prepare for release

Changed:
  A   zope.app.testing/tags/3.4.3/
  D   zope.app.testing/tags/3.4.3/CHANGES.txt
  A   zope.app.testing/tags/3.4.3/CHANGES.txt
  U   zope.app.testing/tags/3.4.3/buildout.cfg
  D   zope.app.testing/tags/3.4.3/src/zope/app/testing/functional.py
  A   zope.app.testing/tags/3.4.3/src/zope/app/testing/functional.py
  D   zope.app.testing/tags/3.4.3/src/zope/app/testing/tests.py
  A   zope.app.testing/tags/3.4.3/src/zope/app/testing/tests.py

-=-
Copied: zope.app.testing/tags/3.4.3 (from rev 88814, zope.app.testing/trunk)

Deleted: zope.app.testing/tags/3.4.3/CHANGES.txt
===================================================================
--- zope.app.testing/trunk/CHANGES.txt	2008-07-25 12:41:50 UTC (rev 88814)
+++ zope.app.testing/tags/3.4.3/CHANGES.txt	2008-07-25 17:24:01 UTC (rev 88824)
@@ -1,19 +0,0 @@
-=======
-CHANGES
-=======
-
-3.4.2 (2008-02-02)
-------------------
-
-- Fix of 599 error on conflict error in request
-  see: http://mail.zope.org/pipermail/zope-dev/2008-January/030844.html
-
-3.4.1 (2007-10-31)
-------------------
-
-- Fixed deprecation warning for ``ZopeSecurityPolicy``.
-
-3.4.0 (2007-10-27)
-------------------
-
-- Initial release independent of the main Zope tree.

Copied: zope.app.testing/tags/3.4.3/CHANGES.txt (from rev 88823, zope.app.testing/trunk/CHANGES.txt)
===================================================================
--- zope.app.testing/tags/3.4.3/CHANGES.txt	                        (rev 0)
+++ zope.app.testing/tags/3.4.3/CHANGES.txt	2008-07-25 17:24:01 UTC (rev 88824)
@@ -0,0 +1,25 @@
+=======
+CHANGES
+=======
+
+3.4.3 (2008-07-25)
+------------------
+
+- Fix memory leak in all functional tests.
+  see: https://bugs.launchpad.net/zope3/+bug/251273
+
+3.4.2 (2008-02-02)
+------------------
+
+- Fix of 599 error on conflict error in request
+  see: http://mail.zope.org/pipermail/zope-dev/2008-January/030844.html
+
+3.4.1 (2007-10-31)
+------------------
+
+- Fixed deprecation warning for ``ZopeSecurityPolicy``.
+
+3.4.0 (2007-10-27)
+------------------
+
+- Initial release independent of the main Zope tree.

Modified: zope.app.testing/tags/3.4.3/buildout.cfg
===================================================================
--- zope.app.testing/trunk/buildout.cfg	2008-07-25 12:41:50 UTC (rev 88814)
+++ zope.app.testing/tags/3.4.3/buildout.cfg	2008-07-25 17:24:01 UTC (rev 88824)
@@ -1,8 +1,11 @@
 [buildout]
-develop = .
-parts = test
+develop = . ../zope.component
+parts = test tags
 
 [test]
 recipe = zc.recipe.testrunner
 eggs = zope.app.testing [test]
 
+[tags]
+recipe = z3c.recipe.tag:tags
+eggs = zope.app.testing

Deleted: zope.app.testing/tags/3.4.3/src/zope/app/testing/functional.py
===================================================================
--- zope.app.testing/trunk/src/zope/app/testing/functional.py	2008-07-25 12:41:50 UTC (rev 88814)
+++ zope.app.testing/tags/3.4.3/src/zope/app/testing/functional.py	2008-07-25 17:24:01 UTC (rev 88824)
@@ -1,763 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2003 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Functional testing framework for Zope 3.
-
-There should be a file 'ftesting.zcml' in the current directory.
-
-$Id$
-"""
-import logging
-import os.path
-import re
-import rfc822
-import sys
-import traceback
-import unittest
-from StringIO import StringIO
-from Cookie import SimpleCookie
-from transaction import abort, commit
-from ZODB.DB import DB
-from ZODB.DemoStorage import DemoStorage
-
-from zope import component
-from zope.publisher.browser import BrowserRequest, setDefaultSkin
-from zope.publisher.http import HTTPRequest
-from zope.publisher.publish import publish
-from zope.security.interfaces import Forbidden, Unauthorized
-from zope.testing import doctest
-
-import zope.app.testing.setup
-from zope.app.appsetup.appsetup import multi_database
-from zope.app.debug import Debugger
-from zope.app.publication.http import HTTPPublication
-from zope.app.publication.zopepublication import ZopePublication
-from zope.app.publication.http import HTTPPublication
-from zope.app.publication.httpfactory import chooseClasses
-from zope.publisher.interfaces.browser import IBrowserRequest
-from zope.app.component.hooks import setSite, getSite
-
-
-class ResponseWrapper(object):
-    """A wrapper that adds several introspective methods to a response."""
-
-    def __init__(self, response, path, omit=()):
-        self._response = response
-        self._path = path
-        self.omit = omit
-        self._body = None
-
-    def getOutput(self):
-        """Returns the full HTTP output (headers + body)"""
-        body = self.getBody()
-        omit = self.omit
-        headers = [x
-                   for x in self._response.getHeaders()
-                   if x[0].lower() not in omit]
-        headers.sort()
-        headers = '\n'.join([("%s: %s" % (n, v)) for (n, v) in headers])
-        statusline = '%s %s' % (self._response._request['SERVER_PROTOCOL'],
-                                self._response.getStatusString())
-        if body:
-            return '%s\n%s\n\n%s' %(statusline, headers, body)
-        else:
-            return '%s\n%s\n' % (statusline, headers)
-
-    def getBody(self):
-        """Returns the response body"""
-        if self._body is None:
-            self._body = ''.join(self._response.consumeBody())
-
-        return self._body
-
-    def getPath(self):
-        """Returns the path of the request"""
-        return self._path
-
-    def __getattr__(self, attr):
-        return getattr(self._response, attr)
-
-    __str__ = getOutput
-
-
-class IManagerSetup(zope.interface.Interface):
-    """Utility for enabling up a functional testing manager with needed grants
-
-    TODO This is an interim solution.  It tries to break the dependence
-    on a particular security policy, however, we need a much better
-    way of managing functional-testing configurations.
-    """
-
-    def setUpManager():
-        """Set up the manager, zope.mgr
-        """
-
-class BaseDatabaseFactory(object):
-    """Factory object for passing to appsetup.multi_databases
-
-    This class is an internal implementation detail, subject to change
-    without notice!
-
-    It is currently used by FunctionalTestSetUp.__init__ to create the
-    basic storage(s) containing the data that is common to all tests
-    in a layer.
-
-    The constructor takes the name of the new database, and a
-    dictionary of storages.  The 'open' method creates a new
-    DemoStorage, and adds it to the storage dictionary under the given
-    name. Then creates and returns a named DB object using the
-    storage.
-    """
-
-    def __init__(self, name, base_storages):
-        self.name = name
-        self.base_storages = base_storages
-
-    def open(self):
-        name = self.name
-        if name in self.base_storages:
-            raise ValueError("Duplicate database name: %r" % name)
-        storage = DemoStorage("Memory storage %r" % name)
-        self.base_storages[name] = storage
-        return DB(storage, database_name=name)
-
-
-class DerivedDatabaseFactory(object):
-    """Factory object for passing to appsetup.multi_databases
-
-    This class is an internal implementation detail, subject to change
-    without notice!
-
-    It is currently used by FunctionalTestSetUp.setUp to create the
-    derived storage(s) used for each test in a layer.
-
-    The constructor takes the name of the new database, and a
-    dictionary of storages.  The 'open' method creates a new
-    DemoStorage as a wrapper around the storage with the given
-    name. Then creates and returns a named DB object using the
-    storage.
-    """
-
-    def __init__(self, name, base_storages):
-        self.name = name
-        self.storage = DemoStorage("Demo storage %r" % name,
-                                   base_storages[name])
-
-    def open(self):
-        return DB(self.storage, database_name=self.name)
-
-
-class FunctionalTestSetup(object):
-    """Keeps shared state across several functional test cases."""
-
-    __shared_state = { '_init': False }
-
-    def __init__(self, config_file=None, database_names=None):
-        """Initializes Zope 3 framework.
-
-        Creates a volatile memory storage.  Parses Zope3 configuration files.
-        """
-        self.__dict__ = self.__shared_state
-
-        if database_names is not None:
-            database_names = tuple(database_names)
-
-        if not self._init:
-
-            # Make sure unit tests are cleaned up
-            zope.app.testing.setup.placefulSetUp()
-            zope.app.testing.setup.placefulTearDown()
-
-            if not config_file:
-                config_file = 'ftesting.zcml'
-            if database_names is None:
-                database_names = ('unnamed',)
-            self.log = StringIO()
-            # Make it silent but keep the log available for debugging
-            logging.root.addHandler(logging.StreamHandler(self.log))
-
-            self._base_storages = {}
-            self.db = multi_database(
-                BaseDatabaseFactory(name, self._base_storages)
-                for name in database_names
-                )[0][0]
-            self.app = Debugger(self.db, config_file)
-
-            self.connection = None
-            self._config_file = config_file
-            self._database_names = database_names
-            self._init = True
-
-            # Make a local grant for the test user
-            setup = component.queryUtility(IManagerSetup)
-            if setup is not None:
-                setup.setUpManager()
-
-            FunctionalTestSetup().connection = None
-
-        elif config_file and config_file != self._config_file:
-            # Running different tests with different configurations is not
-            # supported at the moment
-            raise NotImplementedError('Already configured'
-                                      ' with a different config file')
-
-        elif database_names and database_names != self._database_names:
-            # Running different tests with different configurations is not
-            # supported at the moment
-            raise NotImplementedError('Already configured'
-                                      ' with different database names')
-
-    # BBB: Simulate the old base_storage attribute, but only when not using
-    # multiple databases. There *is* code in the wild that uses the attribute.
-    def _get_base_storage(self):
-        if len(self._database_names)!=1:
-            raise AttributeError('base_storage')
-        return self._base_storages[self._database_names[0]]
-
-    def _set_base_storage(self, value):
-        if len(self._database_names)!=1:
-            raise AttributeError('base_storage')
-        self._base_storages[self._database_names[0]] = value
-
-    base_storage = property(_get_base_storage, _set_base_storage)
-
-    def setUp(self):
-        """Prepares for a functional test case."""
-        # Tear down the old demo storages (if any) and create fresh ones
-        abort()
-        for db in self.db.databases.itervalues():
-            db.close()
-        self.db = self.app.db = multi_database(
-            DerivedDatabaseFactory(name, self._base_storages)
-            for name in self._database_names
-            )[0][0]
-        self.connection = None
-
-    def tearDown(self):
-        """Cleans up after a functional test case."""
-        abort()
-        if self.connection:
-            self.connection.close()
-            self.connection = None
-        for db in self.db.databases.itervalues():
-            db.close()
-        setSite(None)
-
-    def tearDownCompletely(self):
-        """Cleans up the setup done by the constructor."""
-        zope.app.testing.setup.placefulTearDown()
-        self._config_file = False
-        self._database_names = None
-        self._init = False
-
-    def getRootFolder(self):
-        """Returns the Zope root folder."""
-        if not self.connection:
-            self.connection = self.db.open()
-        root = self.connection.root()
-        return root[ZopePublication.root_name]
-
-    def getApplication(self):
-        """Returns the Zope application instance."""
-        return self.app
-
-
-class ZCMLLayer:
-    """ZCML-defined test layer
-    """
-
-    __bases__ = ()
-
-    def __init__(self, config_file, module, name, allow_teardown=False):
-        self.config_file = config_file
-        self.__module__ = module
-        self.__name__ = name
-        self.allow_teardown = allow_teardown
-
-    def setUp(self):
-        self.setup = FunctionalTestSetup(self.config_file)
-
-    def tearDown(self):
-        self.setup.tearDownCompletely()
-        if not self.allow_teardown:
-            # Some ZCML directives change globals but are not accompanied
-            # with registered CleanUp handlers to undo the changes.  Let
-            # packages which use such directives indicate that they do not
-            # support tearing down.
-            raise NotImplementedError
-
-
-def defineLayer(name, zcml='test.zcml', allow_teardown=False):
-    """Helper function for defining layers.
-
-    Usage: defineLayer('foo')
-    """
-    globals = sys._getframe(1).f_globals
-    globals[name] = ZCMLLayer(
-        os.path.join(os.path.split(globals['__file__'])[0], zcml),
-        globals['__name__'],
-        name,
-        allow_teardown=allow_teardown,
-        )
-
-if os.path.exists(os.path.join('zopeskel', 'etc', 'ftesting.zcml')):
-    Functional = os.path.join('zopeskel', 'etc', 'ftesting.zcml')
-    FunctionalNoDevMode = os.path.join('zopeskel', 'etc', 'ftesting-base.zcml')
-elif os.path.exists(os.path.join('etc', 'ftesting.zcml')):
-    Functional = os.path.join('etc', 'ftesting.zcml')
-    FunctionalNoDevMode = os.path.join('etc', 'ftesting-base.zcml')
-else:
-    # let's hope that the file is in our CWD. If not, we'll get an
-    # error anyways, but we can't just throw an error if we don't find
-    # that file. This module might be imported for other things as
-    # well, not only starting up Zope from ftesting.zcml.
-    Functional = 'ftesting.zcml'
-    FunctionalNoDevMode = 'ftesting-base.zcml'
-
-Functional = os.path.abspath(Functional)
-FunctionalNoDevMode = os.path.abspath(FunctionalNoDevMode)
-
-Functional = ZCMLLayer(Functional, __name__, 'Functional')
-FunctionalNoDevMode = ZCMLLayer(FunctionalNoDevMode, __name__,
-                                'FunctionalNoDevMode')
-
-class FunctionalTestCase(unittest.TestCase):
-    """Functional test case."""
-
-    layer = Functional
-
-    def setUp(self):
-        """Prepares for a functional test case."""
-        super(FunctionalTestCase, self).setUp()
-        FunctionalTestSetup().setUp()
-
-    def tearDown(self):
-        """Cleans up after a functional test case."""
-        FunctionalTestSetup().tearDown()
-        super(FunctionalTestCase, self).tearDown()
-
-    def getRootFolder(self):
-        """Returns the Zope root folder."""
-        return FunctionalTestSetup().getRootFolder()
-
-    def commit(self):
-        commit()
-
-    def abort(self):
-        abort()
-
-
-class CookieHandler(object):
-
-    def __init__(self, *args, **kw):
-        # Somewhere to store cookies between consecutive requests
-        self.cookies = SimpleCookie()
-        super(CookieHandler, self).__init__(*args, **kw)
-
-
-    def httpCookie(self, path):
-         """Return self.cookies as an HTTP_COOKIE environment value."""
-         l = [m.OutputString().split(';')[0] for m in self.cookies.values()
-              if path.startswith(m['path'])]
-         return '; '.join(l)
-
-    def loadCookies(self, envstring):
-        self.cookies.load(envstring)
-
-    def saveCookies(self, response):
-        """Save cookies from the response."""
-        # Urgh - need to play with the response's privates to extract
-        # cookies that have been set
-        # TODO: extend the IHTTPRequest interface to allow access to all
-        # cookies
-        # TODO: handle cookie expirations
-        for k,v in response._cookies.items():
-            k = k.encode('utf8')
-            self.cookies[k] = v['value'].encode('utf8')
-            if v.has_key('path'):
-                self.cookies[k]['path'] = v['path']
-
-
-class BrowserTestCase(CookieHandler, FunctionalTestCase):
-    """Functional test case for Browser requests."""
-
-    def setSite(self, site):
-        """Set the site which will be used to look up local components"""
-        setSite(site)
-
-    def getSite(self):
-        """Returns the site which is used to look up local components"""
-        return getSite()
-
-    def makeRequest(self, path='', basic=None, form=None, env={}):
-        """Creates a new request object.
-
-        Arguments:
-          path   -- the path to be traversed (e.g. "/folder1/index.html")
-          basic  -- basic HTTP authentication credentials ("user:password")
-          form   -- a dictionary emulating a form submission
-                    (Note that field values should be Unicode strings)
-          env    -- a dictionary of additional environment variables
-                    (You can emulate HTTP request header
-                       X-Header: foo
-                     by adding 'HTTP_X_HEADER': 'foo' to env)
-        """
-        environment = {"HTTP_HOST": 'localhost',
-                       "HTTP_REFERER": 'localhost',
-                       "HTTP_COOKIE": self.httpCookie(path)}
-        environment.update(env)
-        app = FunctionalTestSetup().getApplication()
-        request = app._request(path, '',
-                               environment=environment,
-                               basic=basic, form=form,
-                               request=BrowserRequest)
-        setDefaultSkin(request)
-        return request
-
-    def publish(self, path, basic=None, form=None, env={},
-                handle_errors=False):
-        """Renders an object at a given location.
-
-        Arguments are the same as in makeRequest with the following exception:
-          handle_errors  -- if False (default), exceptions will not be caught
-                            if True, exceptions will return a formatted error
-                            page.
-
-        Returns the response object enhanced with the following methods:
-          getOutput()    -- returns the full HTTP output as a string
-          getBody()      -- returns the full response body as a string
-          getPath()      -- returns the path used in the request
-        """
-        old_site = self.getSite()
-        self.setSite(None)
-        # A cookie header has been sent - ensure that future requests
-        # in this test also send the cookie, as this is what browsers do.
-        # We pull it apart and reassemble the header to block cookies
-        # with invalid paths going through, which may or may not be correct
-        if env.has_key('HTTP_COOKIE'):
-            self.loadCookies(env['HTTP_COOKIE'])
-            del env['HTTP_COOKIE'] # Added again in makeRequest
-
-        request = self.makeRequest(path, basic=basic, form=form, env=env)
-        if env.has_key('HTTP_COOKIE'):
-            self.loadCookies(env['HTTP_COOKIE'])
-
-        request = publish(request, handle_errors=handle_errors)
-
-        response = ResponseWrapper(request.response, path)
-
-        self.saveCookies(response)
-        self.setSite(old_site)
-        return response
-
-    def checkForBrokenLinks(self, body, path, basic=None):
-        """Looks for broken links in a page by trying to traverse relative
-        URIs.
-        """
-        if not body: return
-
-        old_site = self.getSite()
-        self.setSite(None)
-
-        from htmllib import HTMLParser
-        from formatter import NullFormatter
-        class SimpleHTMLParser(HTMLParser):
-            def __init__(self, fmt, base):
-                HTMLParser.__init__(self, fmt)
-                self.base = base
-            def do_base(self, attrs):
-                self.base = dict(attrs).get('href', self.base)
-
-        parser = SimpleHTMLParser(NullFormatter(), path)
-        parser.feed(body)
-        parser.close()
-        base = parser.base
-        while not base.endswith('/'):
-            base = base[:-1]
-        if base.startswith('http://localhost/'):
-            base = base[len('http://localhost/') - 1:]
-
-        errors = []
-        for a in parser.anchorlist:
-            if a.startswith('http://localhost/'):
-                a = a[len('http://localhost/') - 1:]
-            elif a.find(':') != -1:
-                # Assume it is an external link
-                continue
-            elif not a.startswith('/'):
-                a = base + a
-            if a.find('#') != -1:
-                a = a[:a.index('#')]
-            # ??? what about queries (/path/to/foo?bar=baz&etc)?
-            request = None
-            try:
-                try:
-                    request = self.makeRequest(a, basic=basic)
-                    publication = request.publication
-                    request.processInputs()
-                    publication.beforeTraversal(request)
-                    object = publication.getApplication(request)
-                    object = request.traverse(object)
-                    publication.afterTraversal(request, object)
-                except (KeyError, NameError, AttributeError, Unauthorized,
-                        Forbidden):
-                    e = traceback.format_exception_only(
-                        *sys.exc_info()[:2])[-1]
-                    errors.append((a, e.strip()))
-            finally:
-                publication.endRequest(request, object)
-                self.setSite(old_site)
-
-                # Make sure we don't have pending changes
-                abort()
-
-                # The request should always be closed to free resources
-                # held by the request
-                if request:
-                    request.close()
-        if errors:
-            self.fail("%s contains broken links:\n" % path
-                      + "\n".join(["  %s:\t%s" % (a, e) for a, e in errors]))
-
-
-class HTTPTestCase(FunctionalTestCase):
-    """Functional test case for HTTP requests."""
-
-    def makeRequest(self, path='', basic=None, form=None, env={},
-                    instream=None):
-        """Creates a new request object.
-
-        Arguments:
-          path   -- the path to be traversed (e.g. "/folder1/index.html")
-          basic  -- basic HTTP authentication credentials ("user:password")
-          form   -- a dictionary emulating a form submission
-                    (Note that field values should be Unicode strings)
-          env    -- a dictionary of additional environment variables
-                    (You can emulate HTTP request header
-                       X-Header: foo
-                     by adding 'HTTP_X_HEADER': 'foo' to env)
-          instream  -- a stream from where the HTTP request will be read
-        """
-        if instream is None:
-            instream = ''
-        environment = {"HTTP_HOST": 'localhost',
-                       "HTTP_REFERER": 'localhost'}
-        environment.update(env)
-        app = FunctionalTestSetup().getApplication()
-        request = app._request(path, instream,
-                               environment=environment,
-                               basic=basic, form=form,
-                               request=HTTPRequest, publication=HTTPPublication)
-        return request
-
-    def publish(self, path, basic=None, form=None, env={},
-                handle_errors=False, request_body=''):
-        """Renders an object at a given location.
-
-        Arguments are the same as in makeRequest with the following exception:
-          handle_errors  -- if False (default), exceptions will not be caught
-                            if True, exceptions will return a formatted error
-                            page.
-
-        Returns the response object enhanced with the following methods:
-          getOutput()    -- returns the full HTTP output as a string
-          getBody()      -- returns the full response body as a string
-          getPath()      -- returns the path used in the request
-        """
-        request = self.makeRequest(path, basic=basic, form=form, env=env,
-                                   instream=request_body)
-        response = ResponseWrapper(request.response, path)
-        publish(request, handle_errors=handle_errors)
-        return response
-
-
-headerre = re.compile(r'(\S+): (.+)$')
-def split_header(header):
-    return headerre.match(header).group(1, 2)
-
-basicre = re.compile('Basic (.+)?:(.+)?$')
-def auth_header(header):
-    match = basicre.match(header)
-    if match:
-        import base64
-        u, p = match.group(1, 2)
-        if u is None:
-            u = ''
-        if p is None:
-            p = ''
-        auth = base64.encodestring('%s:%s' % (u, p))
-        return 'Basic %s' % auth[:-1]
-    return header
-
-def getRootFolder():
-    return FunctionalTestSetup().getRootFolder()
-
-def sync():
-    getRootFolder()._p_jar.sync()
-
-#
-# Sample functional test case
-#
-
-class SampleFunctionalTest(BrowserTestCase):
-
-    def testRootPage(self):
-        response = self.publish('/')
-        self.assertEquals(response.getStatus(), 200)
-
-    def testRootPage_preferred_languages(self):
-        response = self.publish('/', env={'HTTP_ACCEPT_LANGUAGE': 'en'})
-        self.assertEquals(response.getStatus(), 200)
-
-    def testNotExisting(self):
-        response = self.publish('/nosuchthing', handle_errors=True)
-        self.assertEquals(response.getStatus(), 404)
-
-    def testLinks(self):
-        response = self.publish('/')
-        self.assertEquals(response.getStatus(), 200)
-        self.checkForBrokenLinks(response.consumeBody(), response.getPath())
-
-
-def sample_test_suite():
-    suite = unittest.TestSuite()
-    suite.addTest(unittest.makeSuite(SampleFunctionalTest))
-    return suite
-
-
-class HTTPCaller(CookieHandler):
-    """Execute an HTTP request string via the publisher"""
-
-    def __call__(self, request_string, handle_errors=True, form=None):
-        # Commit work done by previous python code.
-        commit()
-
-        # Discard leading white space to make call layout simpler
-        request_string = request_string.lstrip()
-
-        # split off and parse the command line
-        l = request_string.find('\n')
-        command_line = request_string[:l].rstrip()
-        request_string = request_string[l+1:]
-        method, path, protocol = command_line.split()
-
-        instream = StringIO(request_string)
-        environment = {"HTTP_COOKIE": self.httpCookie(path),
-                       "HTTP_HOST": 'localhost',
-                       "HTTP_REFERER": 'localhost',
-                       "REQUEST_METHOD": method,
-                       "SERVER_PROTOCOL": protocol,
-                       }
-
-        headers = [split_header(header)
-                   for header in rfc822.Message(instream).headers]
-        for name, value in headers:
-            name = ('_'.join(name.upper().split('-')))
-            if name not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
-                name = 'HTTP_' + name
-            environment[name] = value.rstrip()
-
-        auth_key = 'HTTP_AUTHORIZATION'
-        if environment.has_key(auth_key):
-            environment[auth_key] = auth_header(environment[auth_key])
-
-        old_site = getSite()
-        setSite(None)
-
-        request_cls, publication_cls = self.chooseRequestClass(method, path,
-                                                               environment)
-        app = FunctionalTestSetup().getApplication()
-
-        request = app._request(
-            path, instream,
-            environment=environment,
-            request=request_cls, publication=publication_cls)
-        if IBrowserRequest.providedBy(request):
-            # only browser requests have skins
-            setDefaultSkin(request)
-
-        if form is not None:
-            if request.form:
-                raise ValueError("only one set of form values can be provided")
-            request.form = form
-
-        request = publish(request, handle_errors=handle_errors)
-
-        response = ResponseWrapper(
-            request.response, path,
-            omit=('x-content-type-warning', 'x-powered-by'),
-            )
-
-        self.saveCookies(response)
-        setSite(old_site)
-
-        # sync Python connection:
-        getRootFolder()._p_jar.sync()
-
-        return response
-
-    def chooseRequestClass(self, method, path, environment):
-        """Choose and return a request class and a publication class"""
-        # note that `path` is unused by the default implementation (BBB)
-        return chooseClasses(method, environment)
-
-
-def FunctionalDocFileSuite(*paths, **kw):
-    """Build a functional test suite from a text file."""
-    kw['package'] = doctest._normalize_module(kw.get('package'))
-    _prepare_doctest_keywords(kw)
-    suite = doctest.DocFileSuite(*paths, **kw)
-    suite.layer = Functional
-    return suite
-
-
-def FunctionalDocTestSuite(*paths, **kw):
-    """Build a functional test suite from docstrings in a module."""
-    _prepare_doctest_keywords(kw)
-    suite = doctest.DocTestSuite(*paths, **kw)
-    suite.layer = Functional
-    return suite
-
-
-def _prepare_doctest_keywords(kw):
-    globs = kw.setdefault('globs', {})
-    globs['http'] = HTTPCaller()
-    globs['getRootFolder'] = getRootFolder
-    globs['sync'] = sync
-
-    kwsetUp = kw.get('setUp')
-    def setUp(test):
-        FunctionalTestSetup().setUp()
-
-        if kwsetUp is not None:
-            kwsetUp(test)
-    kw['setUp'] = setUp
-
-    kwtearDown = kw.get('tearDown')
-    def tearDown(test):
-        if kwtearDown is not None:
-            kwtearDown(test)
-        FunctionalTestSetup().tearDown()
-    kw['tearDown'] = tearDown
-
-    if 'optionflags' not in kw:
-        old = doctest.set_unittest_reportflags(0)
-        doctest.set_unittest_reportflags(old)
-        kw['optionflags'] = (old
-                             | doctest.ELLIPSIS
-                             | doctest.REPORT_NDIFF
-                             | doctest.NORMALIZE_WHITESPACE)
-
-
-if __name__ == '__main__':
-    unittest.main()

Copied: zope.app.testing/tags/3.4.3/src/zope/app/testing/functional.py (from rev 88823, zope.app.testing/trunk/src/zope/app/testing/functional.py)
===================================================================
--- zope.app.testing/tags/3.4.3/src/zope/app/testing/functional.py	                        (rev 0)
+++ zope.app.testing/tags/3.4.3/src/zope/app/testing/functional.py	2008-07-25 17:24:01 UTC (rev 88824)
@@ -0,0 +1,768 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Functional testing framework for Zope 3.
+
+There should be a file 'ftesting.zcml' in the current directory.
+
+$Id$
+"""
+import logging
+import os.path
+import re
+import rfc822
+import sys
+import traceback
+import unittest
+from StringIO import StringIO
+from Cookie import SimpleCookie
+from transaction import abort, commit
+from ZODB.DB import DB
+from ZODB.DemoStorage import DemoStorage
+from ZODB.interfaces import IDatabase
+
+from zope import component
+from zope.publisher.browser import BrowserRequest, setDefaultSkin
+from zope.publisher.http import HTTPRequest
+from zope.publisher.publish import publish
+from zope.security.interfaces import Forbidden, Unauthorized
+from zope.testing import doctest
+
+import zope.app.testing.setup
+from zope.app.appsetup.appsetup import multi_database
+from zope.app.debug import Debugger
+from zope.app.publication.http import HTTPPublication
+from zope.app.publication.zopepublication import ZopePublication
+from zope.app.publication.http import HTTPPublication
+from zope.app.publication.httpfactory import chooseClasses
+from zope.publisher.interfaces.browser import IBrowserRequest
+from zope.app.component.hooks import setSite, getSite
+
+
+class ResponseWrapper(object):
+    """A wrapper that adds several introspective methods to a response."""
+
+    def __init__(self, response, path, omit=()):
+        self._response = response
+        self._path = path
+        self.omit = omit
+        self._body = None
+
+    def getOutput(self):
+        """Returns the full HTTP output (headers + body)"""
+        body = self.getBody()
+        omit = self.omit
+        headers = [x
+                   for x in self._response.getHeaders()
+                   if x[0].lower() not in omit]
+        headers.sort()
+        headers = '\n'.join([("%s: %s" % (n, v)) for (n, v) in headers])
+        statusline = '%s %s' % (self._response._request['SERVER_PROTOCOL'],
+                                self._response.getStatusString())
+        if body:
+            return '%s\n%s\n\n%s' %(statusline, headers, body)
+        else:
+            return '%s\n%s\n' % (statusline, headers)
+
+    def getBody(self):
+        """Returns the response body"""
+        if self._body is None:
+            self._body = ''.join(self._response.consumeBody())
+
+        return self._body
+
+    def getPath(self):
+        """Returns the path of the request"""
+        return self._path
+
+    def __getattr__(self, attr):
+        return getattr(self._response, attr)
+
+    __str__ = getOutput
+
+
+class IManagerSetup(zope.interface.Interface):
+    """Utility for enabling up a functional testing manager with needed grants
+
+    TODO This is an interim solution.  It tries to break the dependence
+    on a particular security policy, however, we need a much better
+    way of managing functional-testing configurations.
+    """
+
+    def setUpManager():
+        """Set up the manager, zope.mgr
+        """
+
+class BaseDatabaseFactory(object):
+    """Factory object for passing to appsetup.multi_databases
+
+    This class is an internal implementation detail, subject to change
+    without notice!
+
+    It is currently used by FunctionalTestSetUp.__init__ to create the
+    basic storage(s) containing the data that is common to all tests
+    in a layer.
+
+    The constructor takes the name of the new database, and a
+    dictionary of storages.  The 'open' method creates a new
+    DemoStorage, and adds it to the storage dictionary under the given
+    name. Then creates and returns a named DB object using the
+    storage.
+    """
+
+    def __init__(self, name, base_storages):
+        self.name = name
+        self.base_storages = base_storages
+
+    def open(self):
+        name = self.name
+        if name in self.base_storages:
+            raise ValueError("Duplicate database name: %r" % name)
+        storage = DemoStorage("Memory storage %r" % name)
+        self.base_storages[name] = storage
+        return DB(storage, database_name=name)
+
+
+class DerivedDatabaseFactory(object):
+    """Factory object for passing to appsetup.multi_databases
+
+    This class is an internal implementation detail, subject to change
+    without notice!
+
+    It is currently used by FunctionalTestSetUp.setUp to create the
+    derived storage(s) used for each test in a layer.
+
+    The constructor takes the name of the new database, and a
+    dictionary of storages.  The 'open' method creates a new
+    DemoStorage as a wrapper around the storage with the given
+    name. Then creates and returns a named DB object using the
+    storage.
+    """
+
+    def __init__(self, name, base_storages):
+        self.name = name
+        self.storage = DemoStorage("Demo storage %r" % name,
+                                   base_storages[name])
+
+    def open(self):
+        return DB(self.storage, database_name=self.name)
+
+
+class FunctionalTestSetup(object):
+    """Keeps shared state across several functional test cases."""
+
+    __shared_state = { '_init': False }
+
+    def __init__(self, config_file=None, database_names=None):
+        """Initializes Zope 3 framework.
+
+        Creates a volatile memory storage.  Parses Zope3 configuration files.
+        """
+        self.__dict__ = self.__shared_state
+
+        if database_names is not None:
+            database_names = tuple(database_names)
+
+        if not self._init:
+
+            # Make sure unit tests are cleaned up
+            zope.app.testing.setup.placefulSetUp()
+            zope.app.testing.setup.placefulTearDown()
+
+            if not config_file:
+                config_file = 'ftesting.zcml'
+            if database_names is None:
+                database_names = ('unnamed',)
+            self.log = StringIO()
+            # Make it silent but keep the log available for debugging
+            logging.root.addHandler(logging.StreamHandler(self.log))
+
+            self._base_storages = {}
+            self.db = multi_database(
+                BaseDatabaseFactory(name, self._base_storages)
+                for name in database_names
+                )[0][0]
+            self.app = Debugger(self.db, config_file)
+
+            self.connection = None
+            self._config_file = config_file
+            self._database_names = database_names
+            self._init = True
+
+            # Make a local grant for the test user
+            setup = component.queryUtility(IManagerSetup)
+            if setup is not None:
+                setup.setUpManager()
+
+            FunctionalTestSetup().connection = None
+
+        elif config_file and config_file != self._config_file:
+            # Running different tests with different configurations is not
+            # supported at the moment
+            raise NotImplementedError('Already configured'
+                                      ' with a different config file')
+
+        elif database_names and database_names != self._database_names:
+            # Running different tests with different configurations is not
+            # supported at the moment
+            raise NotImplementedError('Already configured'
+                                      ' with different database names')
+
+    # BBB: Simulate the old base_storage attribute, but only when not using
+    # multiple databases. There *is* code in the wild that uses the attribute.
+    def _get_base_storage(self):
+        if len(self._database_names)!=1:
+            raise AttributeError('base_storage')
+        return self._base_storages[self._database_names[0]]
+
+    def _set_base_storage(self, value):
+        if len(self._database_names)!=1:
+            raise AttributeError('base_storage')
+        self._base_storages[self._database_names[0]] = value
+
+    base_storage = property(_get_base_storage, _set_base_storage)
+
+    def _close_databases(self):
+        base = component.getGlobalSiteManager()
+        for name, db in self.db.databases.iteritems():
+            db.close()
+            base.unregisterUtility(db, IDatabase, name)
+
+    def setUp(self):
+        """Prepares for a functional test case."""
+        # Tear down the old demo storages (if any) and create fresh ones
+        abort()
+        self._close_databases()
+        self.db = self.app.db = multi_database(
+            DerivedDatabaseFactory(name, self._base_storages)
+            for name in self._database_names
+            )[0][0]
+        self.connection = None
+
+    def tearDown(self):
+        """Cleans up after a functional test case."""
+        abort()
+        if self.connection:
+            self.connection.close()
+            self.connection = None
+        self._close_databases()
+        setSite(None)
+
+    def tearDownCompletely(self):
+        """Cleans up the setup done by the constructor."""
+        zope.app.testing.setup.placefulTearDown()
+        self._config_file = False
+        self._database_names = None
+        self._init = False
+
+    def getRootFolder(self):
+        """Returns the Zope root folder."""
+        if not self.connection:
+            self.connection = self.db.open()
+        root = self.connection.root()
+        return root[ZopePublication.root_name]
+
+    def getApplication(self):
+        """Returns the Zope application instance."""
+        return self.app
+
+
+class ZCMLLayer:
+    """ZCML-defined test layer
+    """
+
+    __bases__ = ()
+
+    def __init__(self, config_file, module, name, allow_teardown=False):
+        self.config_file = config_file
+        self.__module__ = module
+        self.__name__ = name
+        self.allow_teardown = allow_teardown
+
+    def setUp(self):
+        self.setup = FunctionalTestSetup(self.config_file)
+
+    def tearDown(self):
+        self.setup.tearDownCompletely()
+        if not self.allow_teardown:
+            # Some ZCML directives change globals but are not accompanied
+            # with registered CleanUp handlers to undo the changes.  Let
+            # packages which use such directives indicate that they do not
+            # support tearing down.
+            raise NotImplementedError
+
+
+def defineLayer(name, zcml='test.zcml', allow_teardown=False):
+    """Helper function for defining layers.
+
+    Usage: defineLayer('foo')
+    """
+    globals = sys._getframe(1).f_globals
+    globals[name] = ZCMLLayer(
+        os.path.join(os.path.split(globals['__file__'])[0], zcml),
+        globals['__name__'],
+        name,
+        allow_teardown=allow_teardown,
+        )
+
+if os.path.exists(os.path.join('zopeskel', 'etc', 'ftesting.zcml')):
+    Functional = os.path.join('zopeskel', 'etc', 'ftesting.zcml')
+    FunctionalNoDevMode = os.path.join('zopeskel', 'etc', 'ftesting-base.zcml')
+elif os.path.exists(os.path.join('etc', 'ftesting.zcml')):
+    Functional = os.path.join('etc', 'ftesting.zcml')
+    FunctionalNoDevMode = os.path.join('etc', 'ftesting-base.zcml')
+else:
+    # let's hope that the file is in our CWD. If not, we'll get an
+    # error anyways, but we can't just throw an error if we don't find
+    # that file. This module might be imported for other things as
+    # well, not only starting up Zope from ftesting.zcml.
+    Functional = 'ftesting.zcml'
+    FunctionalNoDevMode = 'ftesting-base.zcml'
+
+Functional = os.path.abspath(Functional)
+FunctionalNoDevMode = os.path.abspath(FunctionalNoDevMode)
+
+Functional = ZCMLLayer(Functional, __name__, 'Functional')
+FunctionalNoDevMode = ZCMLLayer(FunctionalNoDevMode, __name__,
+                                'FunctionalNoDevMode')
+
+class FunctionalTestCase(unittest.TestCase):
+    """Functional test case."""
+
+    layer = Functional
+
+    def setUp(self):
+        """Prepares for a functional test case."""
+        super(FunctionalTestCase, self).setUp()
+        FunctionalTestSetup().setUp()
+
+    def tearDown(self):
+        """Cleans up after a functional test case."""
+        FunctionalTestSetup().tearDown()
+        super(FunctionalTestCase, self).tearDown()
+
+    def getRootFolder(self):
+        """Returns the Zope root folder."""
+        return FunctionalTestSetup().getRootFolder()
+
+    def commit(self):
+        commit()
+
+    def abort(self):
+        abort()
+
+
+class CookieHandler(object):
+
+    def __init__(self, *args, **kw):
+        # Somewhere to store cookies between consecutive requests
+        self.cookies = SimpleCookie()
+        super(CookieHandler, self).__init__(*args, **kw)
+
+
+    def httpCookie(self, path):
+         """Return self.cookies as an HTTP_COOKIE environment value."""
+         l = [m.OutputString().split(';')[0] for m in self.cookies.values()
+              if path.startswith(m['path'])]
+         return '; '.join(l)
+
+    def loadCookies(self, envstring):
+        self.cookies.load(envstring)
+
+    def saveCookies(self, response):
+        """Save cookies from the response."""
+        # Urgh - need to play with the response's privates to extract
+        # cookies that have been set
+        # TODO: extend the IHTTPRequest interface to allow access to all
+        # cookies
+        # TODO: handle cookie expirations
+        for k,v in response._cookies.items():
+            k = k.encode('utf8')
+            self.cookies[k] = v['value'].encode('utf8')
+            if v.has_key('path'):
+                self.cookies[k]['path'] = v['path']
+
+
+class BrowserTestCase(CookieHandler, FunctionalTestCase):
+    """Functional test case for Browser requests."""
+
+    def setSite(self, site):
+        """Set the site which will be used to look up local components"""
+        setSite(site)
+
+    def getSite(self):
+        """Returns the site which is used to look up local components"""
+        return getSite()
+
+    def makeRequest(self, path='', basic=None, form=None, env={}):
+        """Creates a new request object.
+
+        Arguments:
+          path   -- the path to be traversed (e.g. "/folder1/index.html")
+          basic  -- basic HTTP authentication credentials ("user:password")
+          form   -- a dictionary emulating a form submission
+                    (Note that field values should be Unicode strings)
+          env    -- a dictionary of additional environment variables
+                    (You can emulate HTTP request header
+                       X-Header: foo
+                     by adding 'HTTP_X_HEADER': 'foo' to env)
+        """
+        environment = {"HTTP_HOST": 'localhost',
+                       "HTTP_REFERER": 'localhost',
+                       "HTTP_COOKIE": self.httpCookie(path)}
+        environment.update(env)
+        app = FunctionalTestSetup().getApplication()
+        request = app._request(path, '',
+                               environment=environment,
+                               basic=basic, form=form,
+                               request=BrowserRequest)
+        setDefaultSkin(request)
+        return request
+
+    def publish(self, path, basic=None, form=None, env={},
+                handle_errors=False):
+        """Renders an object at a given location.
+
+        Arguments are the same as in makeRequest with the following exception:
+          handle_errors  -- if False (default), exceptions will not be caught
+                            if True, exceptions will return a formatted error
+                            page.
+
+        Returns the response object enhanced with the following methods:
+          getOutput()    -- returns the full HTTP output as a string
+          getBody()      -- returns the full response body as a string
+          getPath()      -- returns the path used in the request
+        """
+        old_site = self.getSite()
+        self.setSite(None)
+        # A cookie header has been sent - ensure that future requests
+        # in this test also send the cookie, as this is what browsers do.
+        # We pull it apart and reassemble the header to block cookies
+        # with invalid paths going through, which may or may not be correct
+        if env.has_key('HTTP_COOKIE'):
+            self.loadCookies(env['HTTP_COOKIE'])
+            del env['HTTP_COOKIE'] # Added again in makeRequest
+
+        request = self.makeRequest(path, basic=basic, form=form, env=env)
+        if env.has_key('HTTP_COOKIE'):
+            self.loadCookies(env['HTTP_COOKIE'])
+
+        request = publish(request, handle_errors=handle_errors)
+
+        response = ResponseWrapper(request.response, path)
+
+        self.saveCookies(response)
+        self.setSite(old_site)
+        return response
+
+    def checkForBrokenLinks(self, body, path, basic=None):
+        """Looks for broken links in a page by trying to traverse relative
+        URIs.
+        """
+        if not body: return
+
+        old_site = self.getSite()
+        self.setSite(None)
+
+        from htmllib import HTMLParser
+        from formatter import NullFormatter
+        class SimpleHTMLParser(HTMLParser):
+            def __init__(self, fmt, base):
+                HTMLParser.__init__(self, fmt)
+                self.base = base
+            def do_base(self, attrs):
+                self.base = dict(attrs).get('href', self.base)
+
+        parser = SimpleHTMLParser(NullFormatter(), path)
+        parser.feed(body)
+        parser.close()
+        base = parser.base
+        while not base.endswith('/'):
+            base = base[:-1]
+        if base.startswith('http://localhost/'):
+            base = base[len('http://localhost/') - 1:]
+
+        errors = []
+        for a in parser.anchorlist:
+            if a.startswith('http://localhost/'):
+                a = a[len('http://localhost/') - 1:]
+            elif a.find(':') != -1:
+                # Assume it is an external link
+                continue
+            elif not a.startswith('/'):
+                a = base + a
+            if a.find('#') != -1:
+                a = a[:a.index('#')]
+            # ??? what about queries (/path/to/foo?bar=baz&etc)?
+            request = None
+            try:
+                try:
+                    request = self.makeRequest(a, basic=basic)
+                    publication = request.publication
+                    request.processInputs()
+                    publication.beforeTraversal(request)
+                    object = publication.getApplication(request)
+                    object = request.traverse(object)
+                    publication.afterTraversal(request, object)
+                except (KeyError, NameError, AttributeError, Unauthorized,
+                        Forbidden):
+                    e = traceback.format_exception_only(
+                        *sys.exc_info()[:2])[-1]
+                    errors.append((a, e.strip()))
+            finally:
+                publication.endRequest(request, object)
+                self.setSite(old_site)
+
+                # Make sure we don't have pending changes
+                abort()
+
+                # The request should always be closed to free resources
+                # held by the request
+                if request:
+                    request.close()
+        if errors:
+            self.fail("%s contains broken links:\n" % path
+                      + "\n".join(["  %s:\t%s" % (a, e) for a, e in errors]))
+
+
+class HTTPTestCase(FunctionalTestCase):
+    """Functional test case for HTTP requests."""
+
+    def makeRequest(self, path='', basic=None, form=None, env={},
+                    instream=None):
+        """Creates a new request object.
+
+        Arguments:
+          path   -- the path to be traversed (e.g. "/folder1/index.html")
+          basic  -- basic HTTP authentication credentials ("user:password")
+          form   -- a dictionary emulating a form submission
+                    (Note that field values should be Unicode strings)
+          env    -- a dictionary of additional environment variables
+                    (You can emulate HTTP request header
+                       X-Header: foo
+                     by adding 'HTTP_X_HEADER': 'foo' to env)
+          instream  -- a stream from where the HTTP request will be read
+        """
+        if instream is None:
+            instream = ''
+        environment = {"HTTP_HOST": 'localhost',
+                       "HTTP_REFERER": 'localhost'}
+        environment.update(env)
+        app = FunctionalTestSetup().getApplication()
+        request = app._request(path, instream,
+                               environment=environment,
+                               basic=basic, form=form,
+                               request=HTTPRequest, publication=HTTPPublication)
+        return request
+
+    def publish(self, path, basic=None, form=None, env={},
+                handle_errors=False, request_body=''):
+        """Renders an object at a given location.
+
+        Arguments are the same as in makeRequest with the following exception:
+          handle_errors  -- if False (default), exceptions will not be caught
+                            if True, exceptions will return a formatted error
+                            page.
+
+        Returns the response object enhanced with the following methods:
+          getOutput()    -- returns the full HTTP output as a string
+          getBody()      -- returns the full response body as a string
+          getPath()      -- returns the path used in the request
+        """
+        request = self.makeRequest(path, basic=basic, form=form, env=env,
+                                   instream=request_body)
+        response = ResponseWrapper(request.response, path)
+        publish(request, handle_errors=handle_errors)
+        return response
+
+
+headerre = re.compile(r'(\S+): (.+)$')
+def split_header(header):
+    return headerre.match(header).group(1, 2)
+
+basicre = re.compile('Basic (.+)?:(.+)?$')
+def auth_header(header):
+    match = basicre.match(header)
+    if match:
+        import base64
+        u, p = match.group(1, 2)
+        if u is None:
+            u = ''
+        if p is None:
+            p = ''
+        auth = base64.encodestring('%s:%s' % (u, p))
+        return 'Basic %s' % auth[:-1]
+    return header
+
+def getRootFolder():
+    return FunctionalTestSetup().getRootFolder()
+
+def sync():
+    getRootFolder()._p_jar.sync()
+
+#
+# Sample functional test case
+#
+
+class SampleFunctionalTest(BrowserTestCase):
+
+    def testRootPage(self):
+        response = self.publish('/')
+        self.assertEquals(response.getStatus(), 200)
+
+    def testRootPage_preferred_languages(self):
+        response = self.publish('/', env={'HTTP_ACCEPT_LANGUAGE': 'en'})
+        self.assertEquals(response.getStatus(), 200)
+
+    def testNotExisting(self):
+        response = self.publish('/nosuchthing', handle_errors=True)
+        self.assertEquals(response.getStatus(), 404)
+
+    def testLinks(self):
+        response = self.publish('/')
+        self.assertEquals(response.getStatus(), 200)
+        self.checkForBrokenLinks(response.consumeBody(), response.getPath())
+
+
+def sample_test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(SampleFunctionalTest))
+    return suite
+
+
+class HTTPCaller(CookieHandler):
+    """Execute an HTTP request string via the publisher"""
+
+    def __call__(self, request_string, handle_errors=True, form=None):
+        # Commit work done by previous python code.
+        commit()
+
+        # Discard leading white space to make call layout simpler
+        request_string = request_string.lstrip()
+
+        # split off and parse the command line
+        l = request_string.find('\n')
+        command_line = request_string[:l].rstrip()
+        request_string = request_string[l+1:]
+        method, path, protocol = command_line.split()
+
+        instream = StringIO(request_string)
+        environment = {"HTTP_COOKIE": self.httpCookie(path),
+                       "HTTP_HOST": 'localhost',
+                       "HTTP_REFERER": 'localhost',
+                       "REQUEST_METHOD": method,
+                       "SERVER_PROTOCOL": protocol,
+                       }
+
+        headers = [split_header(header)
+                   for header in rfc822.Message(instream).headers]
+        for name, value in headers:
+            name = ('_'.join(name.upper().split('-')))
+            if name not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
+                name = 'HTTP_' + name
+            environment[name] = value.rstrip()
+
+        auth_key = 'HTTP_AUTHORIZATION'
+        if environment.has_key(auth_key):
+            environment[auth_key] = auth_header(environment[auth_key])
+
+        old_site = getSite()
+        setSite(None)
+
+        request_cls, publication_cls = self.chooseRequestClass(method, path,
+                                                               environment)
+        app = FunctionalTestSetup().getApplication()
+
+        request = app._request(
+            path, instream,
+            environment=environment,
+            request=request_cls, publication=publication_cls)
+        if IBrowserRequest.providedBy(request):
+            # only browser requests have skins
+            setDefaultSkin(request)
+
+        if form is not None:
+            if request.form:
+                raise ValueError("only one set of form values can be provided")
+            request.form = form
+
+        request = publish(request, handle_errors=handle_errors)
+
+        response = ResponseWrapper(
+            request.response, path,
+            omit=('x-content-type-warning', 'x-powered-by'),
+            )
+
+        self.saveCookies(response)
+        setSite(old_site)
+
+        # sync Python connection:
+        getRootFolder()._p_jar.sync()
+
+        return response
+
+    def chooseRequestClass(self, method, path, environment):
+        """Choose and return a request class and a publication class"""
+        # note that `path` is unused by the default implementation (BBB)
+        return chooseClasses(method, environment)
+
+
+def FunctionalDocFileSuite(*paths, **kw):
+    """Build a functional test suite from a text file."""
+    kw['package'] = doctest._normalize_module(kw.get('package'))
+    _prepare_doctest_keywords(kw)
+    suite = doctest.DocFileSuite(*paths, **kw)
+    suite.layer = Functional
+    return suite
+
+
+def FunctionalDocTestSuite(*paths, **kw):
+    """Build a functional test suite from docstrings in a module."""
+    _prepare_doctest_keywords(kw)
+    suite = doctest.DocTestSuite(*paths, **kw)
+    suite.layer = Functional
+    return suite
+
+
+def _prepare_doctest_keywords(kw):
+    globs = kw.setdefault('globs', {})
+    globs['http'] = HTTPCaller()
+    globs['getRootFolder'] = getRootFolder
+    globs['sync'] = sync
+
+    kwsetUp = kw.get('setUp')
+    def setUp(test):
+        FunctionalTestSetup().setUp()
+
+        if kwsetUp is not None:
+            kwsetUp(test)
+    kw['setUp'] = setUp
+
+    kwtearDown = kw.get('tearDown')
+    def tearDown(test):
+        if kwtearDown is not None:
+            kwtearDown(test)
+        FunctionalTestSetup().tearDown()
+    kw['tearDown'] = tearDown
+
+    if 'optionflags' not in kw:
+        old = doctest.set_unittest_reportflags(0)
+        doctest.set_unittest_reportflags(old)
+        kw['optionflags'] = (old
+                             | doctest.ELLIPSIS
+                             | doctest.REPORT_NDIFF
+                             | doctest.NORMALIZE_WHITESPACE)
+
+
+if __name__ == '__main__':
+    unittest.main()

Deleted: zope.app.testing/tags/3.4.3/src/zope/app/testing/tests.py
===================================================================
--- zope.app.testing/trunk/src/zope/app/testing/tests.py	2008-07-25 12:41:50 UTC (rev 88814)
+++ zope.app.testing/tags/3.4.3/src/zope/app/testing/tests.py	2008-07-25 17:24:01 UTC (rev 88824)
@@ -1,431 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2004 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Test tcpdoc
-
-$Id$
-"""
-import os
-import re
-import unittest
-import StringIO
-
-from zope.testing.doctestunit import DocTestSuite
-from zope.testing.renormalizing import RENormalizing
-
-import zope.app.testing
-from zope.app.publication.requestpublicationregistry import factoryRegistry
-from zope.app.publication.requestpublicationfactories import BrowserFactory
-from zope.app.testing import functional
-from zope.app.testing.dochttp import dochttp
-import transaction
-from zope.app.testing.functional import SampleFunctionalTest, BrowserTestCase
-from zope.app.testing.functional import FunctionalDocFileSuite
-from zope.app.testing.functional import FunctionalTestCase
-from zope.app.testing.testing import AppTestingLayer
-
-from zope.app.testing.testing import FailingKlass
-
-
-
-HEADERS = """\
-HTTP/1.1 200 OK
-Content-Type: text/plain
-"""
-
-BODY = """\
-This is the response body.
-"""
-
-directory = os.path.join(os.path.split(zope.app.testing.__file__)[0],
-                         'recorded')
-
-expected = r'''
-
-  >>> print http(r"""
-  ... GET /@@contents.html HTTP/1.1
-  ... """)
-  HTTP/1.1 401 Unauthorized
-  Content-Length: 89
-  Content-Type: text/html;charset=utf-8
-  Www-Authenticate: basic realm="Zope"
-  <BLANKLINE>
-  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
-        lang="en">
-  <BLANKLINE>
-  ...
-  <BLANKLINE>
-  </html>
-  <BLANKLINE>
-  <BLANKLINE>
-
-
-  >>> print http(r"""
-  ... GET /@@contents.html HTTP/1.1
-  ... Authorization: Basic bWdyOm1ncnB3
-  ... """)
-  HTTP/1.1 200 OK
-  Content-Length: 89
-  Content-Type: text/html;charset=utf-8
-  <BLANKLINE>
-  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
-        lang="en">
-  <BLANKLINE>
-  ...
-  <BLANKLINE>
-  </html>
-  <BLANKLINE>
-  <BLANKLINE>
-
-
-  >>> print http(r"""
-  ... GET /++etc++site/@@manage HTTP/1.1
-  ... Authorization: Basic bWdyOm1ncnB3
-  ... Referer: http://localhost:8081/
-  ... """)
-  HTTP/1.1 303 See Other
-  Content-Length: 0
-  Content-Type: text/plain;charset=utf-8
-  Location: @@tasks.html
-  <BLANKLINE>
-
-
-  >>> print http(r"""
-  ... GET / HTTP/1.1
-  ... Authorization: Basic bWdyOm1ncnB3
-  ... """)
-  HTTP/1.1 200 OK
-  Content-Length: 89
-  Content-Type: text/html;charset=utf-8
-  <BLANKLINE>
-  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
-        lang="en">
-  <BLANKLINE>
-  ...
-  <BLANKLINE>
-  </html>
-  <BLANKLINE>
-  <BLANKLINE>
-
-
-  >>> print http(r"""
-  ... GET /++etc++site/@@tasks.html HTTP/1.1
-  ... Authorization: Basic bWdyOm1ncnB3
-  ... Referer: http://localhost:8081/
-  ... """)
-  HTTP/1.1 200 OK
-  Content-Length: 89
-  Content-Type: text/html;charset=utf-8
-  <BLANKLINE>
-  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
-        lang="en">
-  <BLANKLINE>
-  ...
-  <BLANKLINE>
-  </html>
-  <BLANKLINE>
-  <BLANKLINE>
-'''
-
-
-class FunctionalHTTPDocTest(unittest.TestCase):
-
-    def test_dochttp(self):
-        import sys
-        old = sys.stdout
-        sys.stdout = StringIO.StringIO()
-        dochttp(['-p', 'test', directory])
-        got = sys.stdout.getvalue()
-        sys.stdout = old
-        self.assertEquals(expected, got)
-
-
-class AuthHeaderTestCase(unittest.TestCase):
-
-    def test_auth_encoded(self):
-        auth_header = functional.auth_header
-        header = 'Basic Z2xvYmFsbWdyOmdsb2JhbG1ncnB3'
-        self.assertEquals(auth_header(header), header)
-
-    def test_auth_non_encoded(self):
-        auth_header = functional.auth_header
-        header = 'Basic globalmgr:globalmgrpw'
-        expected = 'Basic Z2xvYmFsbWdyOmdsb2JhbG1ncnB3'
-        self.assertEquals(auth_header(header), expected)
-
-    def test_auth_non_encoded_empty(self):
-        auth_header = functional.auth_header
-        header = 'Basic globalmgr:'
-        expected = 'Basic Z2xvYmFsbWdyOg=='
-        self.assertEquals(auth_header(header), expected)
-        header = 'Basic :pass'
-        expected = 'Basic OnBhc3M='
-        self.assertEquals(auth_header(header), expected)
-
-    def test_auth_non_encoded_colon(self):
-        auth_header = zope.app.testing.functional.auth_header
-        header = 'Basic globalmgr:pass:pass'
-        expected = 'Basic Z2xvYmFsbWdyOnBhc3M6cGFzcw=='
-        self.assertEquals(auth_header(header), expected)
-
-
-class HTTPCallerTestCase(unittest.TestCase):
-
-    def test_chooseRequestClass(self):
-        from zope.publisher.interfaces import IRequest, IPublication
-
-        factoryRegistry.register('GET', '*', 'browser', 0, BrowserFactory())
-
-        caller = functional.HTTPCaller()
-        request_class, publication_class = caller.chooseRequestClass(
-            method='GET', path='/', environment={})
-
-        self.assert_(IRequest.implementedBy(request_class))
-        self.assert_(IPublication.implementedBy(publication_class))
-
-
-class DummyCookiesResponse(object):
-    # Ugh, this simulates the *internals* of a HTTPResponse object
-    # TODO: expand the IHTTPResponse interface to give access to all cookies
-    _cookies = None
-
-    def __init__(self, cookies=None):
-        if not cookies:
-            cookies = {}
-        self._cookies = cookies
-
-
-class CookieHandlerTestCase(unittest.TestCase):
-    def setUp(self):
-        self.handler = functional.CookieHandler()
-
-    def test_saveCookies(self):
-        response = DummyCookiesResponse(dict(
-            spam=dict(value='eggs', path='/foo', comment='rest is ignored'),
-            monty=dict(value='python')))
-        self.handler.saveCookies(response)
-        self.assertEqual(len(self.handler.cookies), 2)
-        self.assert_(self.handler.cookies['spam'].OutputString() in
-                         ('spam=eggs; Path=/foo;','spam=eggs; Path=/foo'))
-        self.assert_(self.handler.cookies['monty'].OutputString() in
-                         ('monty=python;','monty=python'))
-
-    def test_httpCookie(self):
-        cookies = self.handler.cookies
-        cookies['spam'] = 'eggs'
-        cookies['spam']['path'] = '/foo'
-        cookies['bar'] = 'baz'
-        cookies['bar']['path'] = '/foo/baz'
-        cookies['monty'] = 'python'
-
-        cookieHeader = self.handler.httpCookie('/foo/bar')
-        parts = cookieHeader.split('; ')
-        parts.sort()
-        self.assertEqual(parts, ['monty=python', 'spam=eggs'])
-
-        cookieHeader = self.handler.httpCookie('/foo/baz')
-        parts = cookieHeader.split('; ')
-        parts.sort()
-        self.assertEqual(parts, ['bar=baz', 'monty=python', 'spam=eggs'])
-
-    # There is no test for CookieHandler.loadCookies because it that method
-    # only passes the arguments on to Cookie.BaseCookie.load, which the
-    # standard library has tests for (we hope).
-
-
-class CookieFunctionalTest(BrowserTestCase):
-
-    """Functional tests should handle cookies like a web browser
-
-    Multiple requests in the same test should acumulate cookies.
-    We also ensure that cookies with path values are only sent for
-    the correct URL's so we can test cookies don't 'leak'. Expiry,
-    secure and other cookie attributes are not being worried about
-    at the moment
-
-    """
-
-    def setUp(self):
-        super(CookieFunctionalTest, self).setUp()
-        self.assertEqual(
-                len(self.cookies.keys()), 0,
-                'cookies store should be empty'
-                )
-
-        root = self.getRootFolder()
-
-        from zope.app.zptpage.zptpage import ZPTPage
-
-        page = ZPTPage()
-
-        page.source = u'''<tal:tag tal:define="
-        cookies python:['%s=%s'%(k,v) for k,v in request.getCookies().items()]"
-        ><tal:tag tal:define="
-        ignored python:cookies.sort()"
-        /><span tal:replace="python:';'.join(cookies)" /></tal:tag>'''
-
-        root['getcookies'] = page
-
-        page = ZPTPage()
-
-        page.source = u'''<tal:tag tal:define="
-            ignored python:request.response.setCookie('bid','bval')" >
-            <h1 tal:condition="ignored" />
-            </tal:tag>'''
-
-        root['setcookie'] = page
-        transaction.commit()
-
-    def tearDown(self):
-        root = self.getRootFolder()
-        del root['getcookies']
-        del root['setcookie']
-        super(CookieFunctionalTest, self).tearDown()
-
-    def testDefaultCookies(self):
-        # By default no cookies are set
-        response = self.publish('/')
-        self.assertEquals(response.getStatus(), 200)
-        self.assert_(not response._request._cookies)
-
-    def testSimpleCookies(self):
-        self.cookies['aid'] = 'aval'
-        response = self.publish('/')
-        self.assertEquals(response.getStatus(), 200)
-        self.assertEquals(response._request._cookies['aid'], 'aval')
-
-    def testCookiePaths(self):
-        # We only send cookies if the path is correct
-        self.cookies['aid'] = 'aval'
-        self.cookies['aid']['Path'] = '/sub/folder'
-        self.cookies['bid'] = 'bval'
-        response = self.publish('/')
-
-        self.assertEquals(response.getStatus(), 200)
-        self.assert_(not response._request._cookies.has_key('aid'))
-        self.assertEquals(response._request._cookies['bid'], 'bval')
-
-    def testHttpCookieHeader(self):
-        # Passing an HTTP_COOKIE header to publish adds cookies
-        response = self.publish('/', env={
-            'HTTP_COOKIE': '$Version=1, aid=aval; $Path=/sub/folder, bid=bval'
-            })
-        self.assertEquals(response.getStatus(), 200)
-        self.failIf(response._request._cookies.has_key('aid'))
-        self.assertEquals(response._request._cookies['bid'], 'bval')
-
-    def testStickyCookies(self):
-        # Cookies should acumulate during the test
-        response = self.publish('/', env={'HTTP_COOKIE': 'aid=aval;'})
-        self.assertEquals(response.getStatus(), 200)
-
-        # Cookies are implicity passed to further requests in this test
-        response = self.publish('/getcookies')
-        self.assertEquals(response.getStatus(), 200)
-        self.assertEquals(response.getBody().strip(), 'aid=aval')
-
-        # And cookies set in responses also acumulate
-        response = self.publish('/setcookie')
-        self.assertEquals(response.getStatus(), 200)
-        response = self.publish('/getcookies')
-        self.assertEquals(response.getStatus(), 200)
-        self.assertEquals(response.getBody().strip(), 'aid=aval;bid=bval')
-
-
-class SkinsAndHTTPCaller(FunctionalTestCase):
-
-    def test_skins(self):
-        # Regression test for http://zope.org/Collectors/Zope3-dev/353
-        from zope.app.testing.functional import HTTPCaller
-        http = HTTPCaller()
-        response = http("GET /++skin++Basic HTTP/1.1\n\n")
-        self.assert_("zopetopBasic.css" in str(response))
-
-class RetryProblemFunctional(FunctionalTestCase):
-
-    def setUp(self):
-        super(RetryProblemFunctional, self).setUp()
-
-        root = self.getRootFolder()
-
-        root['fail'] = FailingKlass()
-
-        transaction.commit()
-
-    def tearDown(self):
-        root = self.getRootFolder()
-        del root['fail']
-        super(RetryProblemFunctional, self).tearDown()
-
-    def test_retryOnConflictErrorFunctional(self):
-        from zope.app.testing.functional import HTTPCaller
-
-        http = HTTPCaller()
-        response = http(r"""
-GET /@@test-conflict-raise-view.html HTTP/1.1
-Authorization: Basic mgr:mgrpw
-""")
-
-        self.assertNotEqual(response.getStatus(), 599)
-        self.assertEqual(response.getStatus(), 500)
-
-class RetryProblemBrowser(BrowserTestCase):
-    def setUp(self):
-        super(RetryProblemBrowser, self).setUp()
-
-        root = self.getRootFolder()
-
-        root['fail'] = FailingKlass()
-
-        transaction.commit()
-
-    def tearDown(self):
-        root = self.getRootFolder()
-        del root['fail']
-        super(RetryProblemBrowser, self).tearDown()
-
-    def test_retryOnConflictErrorBrowser(self):
-        response = self.publish('/@@test-conflict-raise-view.html',
-                                handle_errors=True)
-        self.assertNotEqual(response.getStatus(), 599)
-        self.assertEqual(response.getStatus(), 500)
-
-def test_suite():
-    checker = RENormalizing([
-        (re.compile(r'^HTTP/1.1 (\d{3}) .*?\n'), 'HTTP/1.1 \\1\n')
-        ])
-    SampleFunctionalTest.layer = AppTestingLayer
-    CookieFunctionalTest.layer = AppTestingLayer
-    SkinsAndHTTPCaller.layer = AppTestingLayer
-    RetryProblemFunctional.layer = AppTestingLayer
-    RetryProblemBrowser.layer = AppTestingLayer
-
-    doc_test = FunctionalDocFileSuite('doctest.txt', checker=checker)
-    doc_test.layer = AppTestingLayer
-
-    return unittest.TestSuite((
-        unittest.makeSuite(FunctionalHTTPDocTest),
-        unittest.makeSuite(AuthHeaderTestCase),
-        unittest.makeSuite(HTTPCallerTestCase),
-        unittest.makeSuite(CookieHandlerTestCase),
-        DocTestSuite(),
-        unittest.makeSuite(SampleFunctionalTest),
-        unittest.makeSuite(CookieFunctionalTest),
-        unittest.makeSuite(SkinsAndHTTPCaller),
-        unittest.makeSuite(RetryProblemFunctional),
-        unittest.makeSuite(RetryProblemBrowser),
-        doc_test,
-        ))
-
-if __name__ == '__main__':
-    unittest.main(defaultTest='test_suite')

Copied: zope.app.testing/tags/3.4.3/src/zope/app/testing/tests.py (from rev 88823, zope.app.testing/trunk/src/zope/app/testing/tests.py)
===================================================================
--- zope.app.testing/tags/3.4.3/src/zope/app/testing/tests.py	                        (rev 0)
+++ zope.app.testing/tags/3.4.3/src/zope/app/testing/tests.py	2008-07-25 17:24:01 UTC (rev 88824)
@@ -0,0 +1,458 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test tcpdoc
+
+$Id$
+"""
+import os
+import re
+import unittest
+import StringIO
+
+from zope.testing.doctestunit import DocTestSuite
+from zope.testing.renormalizing import RENormalizing
+from zope.component import getAllUtilitiesRegisteredFor
+from ZODB.interfaces import IDatabase
+
+import zope.app.testing
+from zope.app.publication.requestpublicationregistry import factoryRegistry
+from zope.app.publication.requestpublicationfactories import BrowserFactory
+from zope.app.testing import functional
+from zope.app.testing.dochttp import dochttp
+import transaction
+from zope.app.testing.functional import SampleFunctionalTest, BrowserTestCase
+from zope.app.testing.functional import FunctionalDocFileSuite
+from zope.app.testing.functional import FunctionalTestCase
+from zope.app.testing.functional import FunctionalTestSetup
+from zope.app.testing.testing import AppTestingLayer
+
+from zope.app.testing.testing import FailingKlass
+
+
+
+HEADERS = """\
+HTTP/1.1 200 OK
+Content-Type: text/plain
+"""
+
+BODY = """\
+This is the response body.
+"""
+
+directory = os.path.join(os.path.split(zope.app.testing.__file__)[0],
+                         'recorded')
+
+expected = r'''
+
+  >>> print http(r"""
+  ... GET /@@contents.html HTTP/1.1
+  ... """)
+  HTTP/1.1 401 Unauthorized
+  Content-Length: 89
+  Content-Type: text/html;charset=utf-8
+  Www-Authenticate: basic realm="Zope"
+  <BLANKLINE>
+  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+        lang="en">
+  <BLANKLINE>
+  ...
+  <BLANKLINE>
+  </html>
+  <BLANKLINE>
+  <BLANKLINE>
+
+
+  >>> print http(r"""
+  ... GET /@@contents.html HTTP/1.1
+  ... Authorization: Basic bWdyOm1ncnB3
+  ... """)
+  HTTP/1.1 200 OK
+  Content-Length: 89
+  Content-Type: text/html;charset=utf-8
+  <BLANKLINE>
+  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+        lang="en">
+  <BLANKLINE>
+  ...
+  <BLANKLINE>
+  </html>
+  <BLANKLINE>
+  <BLANKLINE>
+
+
+  >>> print http(r"""
+  ... GET /++etc++site/@@manage HTTP/1.1
+  ... Authorization: Basic bWdyOm1ncnB3
+  ... Referer: http://localhost:8081/
+  ... """)
+  HTTP/1.1 303 See Other
+  Content-Length: 0
+  Content-Type: text/plain;charset=utf-8
+  Location: @@tasks.html
+  <BLANKLINE>
+
+
+  >>> print http(r"""
+  ... GET / HTTP/1.1
+  ... Authorization: Basic bWdyOm1ncnB3
+  ... """)
+  HTTP/1.1 200 OK
+  Content-Length: 89
+  Content-Type: text/html;charset=utf-8
+  <BLANKLINE>
+  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+        lang="en">
+  <BLANKLINE>
+  ...
+  <BLANKLINE>
+  </html>
+  <BLANKLINE>
+  <BLANKLINE>
+
+
+  >>> print http(r"""
+  ... GET /++etc++site/@@tasks.html HTTP/1.1
+  ... Authorization: Basic bWdyOm1ncnB3
+  ... Referer: http://localhost:8081/
+  ... """)
+  HTTP/1.1 200 OK
+  Content-Length: 89
+  Content-Type: text/html;charset=utf-8
+  <BLANKLINE>
+  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+        lang="en">
+  <BLANKLINE>
+  ...
+  <BLANKLINE>
+  </html>
+  <BLANKLINE>
+  <BLANKLINE>
+'''
+
+
+class FunctionalHTTPDocTest(unittest.TestCase):
+
+    def test_dochttp(self):
+        import sys
+        old = sys.stdout
+        sys.stdout = StringIO.StringIO()
+        dochttp(['-p', 'test', directory])
+        got = sys.stdout.getvalue()
+        sys.stdout = old
+        self.assertEquals(expected, got)
+
+
+class AuthHeaderTestCase(unittest.TestCase):
+
+    def test_auth_encoded(self):
+        auth_header = functional.auth_header
+        header = 'Basic Z2xvYmFsbWdyOmdsb2JhbG1ncnB3'
+        self.assertEquals(auth_header(header), header)
+
+    def test_auth_non_encoded(self):
+        auth_header = functional.auth_header
+        header = 'Basic globalmgr:globalmgrpw'
+        expected = 'Basic Z2xvYmFsbWdyOmdsb2JhbG1ncnB3'
+        self.assertEquals(auth_header(header), expected)
+
+    def test_auth_non_encoded_empty(self):
+        auth_header = functional.auth_header
+        header = 'Basic globalmgr:'
+        expected = 'Basic Z2xvYmFsbWdyOg=='
+        self.assertEquals(auth_header(header), expected)
+        header = 'Basic :pass'
+        expected = 'Basic OnBhc3M='
+        self.assertEquals(auth_header(header), expected)
+
+    def test_auth_non_encoded_colon(self):
+        auth_header = zope.app.testing.functional.auth_header
+        header = 'Basic globalmgr:pass:pass'
+        expected = 'Basic Z2xvYmFsbWdyOnBhc3M6cGFzcw=='
+        self.assertEquals(auth_header(header), expected)
+
+
+class HTTPCallerTestCase(unittest.TestCase):
+
+    def test_chooseRequestClass(self):
+        from zope.publisher.interfaces import IRequest, IPublication
+
+        factoryRegistry.register('GET', '*', 'browser', 0, BrowserFactory())
+
+        caller = functional.HTTPCaller()
+        request_class, publication_class = caller.chooseRequestClass(
+            method='GET', path='/', environment={})
+
+        self.assert_(IRequest.implementedBy(request_class))
+        self.assert_(IPublication.implementedBy(publication_class))
+
+
+class DummyCookiesResponse(object):
+    # Ugh, this simulates the *internals* of a HTTPResponse object
+    # TODO: expand the IHTTPResponse interface to give access to all cookies
+    _cookies = None
+
+    def __init__(self, cookies=None):
+        if not cookies:
+            cookies = {}
+        self._cookies = cookies
+
+
+class CookieHandlerTestCase(unittest.TestCase):
+    def setUp(self):
+        self.handler = functional.CookieHandler()
+
+    def test_saveCookies(self):
+        response = DummyCookiesResponse(dict(
+            spam=dict(value='eggs', path='/foo', comment='rest is ignored'),
+            monty=dict(value='python')))
+        self.handler.saveCookies(response)
+        self.assertEqual(len(self.handler.cookies), 2)
+        self.assert_(self.handler.cookies['spam'].OutputString() in
+                         ('spam=eggs; Path=/foo;','spam=eggs; Path=/foo'))
+        self.assert_(self.handler.cookies['monty'].OutputString() in
+                         ('monty=python;','monty=python'))
+
+    def test_httpCookie(self):
+        cookies = self.handler.cookies
+        cookies['spam'] = 'eggs'
+        cookies['spam']['path'] = '/foo'
+        cookies['bar'] = 'baz'
+        cookies['bar']['path'] = '/foo/baz'
+        cookies['monty'] = 'python'
+
+        cookieHeader = self.handler.httpCookie('/foo/bar')
+        parts = cookieHeader.split('; ')
+        parts.sort()
+        self.assertEqual(parts, ['monty=python', 'spam=eggs'])
+
+        cookieHeader = self.handler.httpCookie('/foo/baz')
+        parts = cookieHeader.split('; ')
+        parts.sort()
+        self.assertEqual(parts, ['bar=baz', 'monty=python', 'spam=eggs'])
+
+    # There is no test for CookieHandler.loadCookies because it that method
+    # only passes the arguments on to Cookie.BaseCookie.load, which the
+    # standard library has tests for (we hope).
+
+
+class CookieFunctionalTest(BrowserTestCase):
+
+    """Functional tests should handle cookies like a web browser
+
+    Multiple requests in the same test should acumulate cookies.
+    We also ensure that cookies with path values are only sent for
+    the correct URL's so we can test cookies don't 'leak'. Expiry,
+    secure and other cookie attributes are not being worried about
+    at the moment
+
+    """
+
+    def setUp(self):
+        super(CookieFunctionalTest, self).setUp()
+        self.assertEqual(
+                len(self.cookies.keys()), 0,
+                'cookies store should be empty'
+                )
+
+        root = self.getRootFolder()
+
+        from zope.app.zptpage.zptpage import ZPTPage
+
+        page = ZPTPage()
+
+        page.source = u'''<tal:tag tal:define="
+        cookies python:['%s=%s'%(k,v) for k,v in request.getCookies().items()]"
+        ><tal:tag tal:define="
+        ignored python:cookies.sort()"
+        /><span tal:replace="python:';'.join(cookies)" /></tal:tag>'''
+
+        root['getcookies'] = page
+
+        page = ZPTPage()
+
+        page.source = u'''<tal:tag tal:define="
+            ignored python:request.response.setCookie('bid','bval')" >
+            <h1 tal:condition="ignored" />
+            </tal:tag>'''
+
+        root['setcookie'] = page
+        transaction.commit()
+
+    def tearDown(self):
+        root = self.getRootFolder()
+        del root['getcookies']
+        del root['setcookie']
+        super(CookieFunctionalTest, self).tearDown()
+
+    def testDefaultCookies(self):
+        # By default no cookies are set
+        response = self.publish('/')
+        self.assertEquals(response.getStatus(), 200)
+        self.assert_(not response._request._cookies)
+
+    def testSimpleCookies(self):
+        self.cookies['aid'] = 'aval'
+        response = self.publish('/')
+        self.assertEquals(response.getStatus(), 200)
+        self.assertEquals(response._request._cookies['aid'], 'aval')
+
+    def testCookiePaths(self):
+        # We only send cookies if the path is correct
+        self.cookies['aid'] = 'aval'
+        self.cookies['aid']['Path'] = '/sub/folder'
+        self.cookies['bid'] = 'bval'
+        response = self.publish('/')
+
+        self.assertEquals(response.getStatus(), 200)
+        self.assert_(not response._request._cookies.has_key('aid'))
+        self.assertEquals(response._request._cookies['bid'], 'bval')
+
+    def testHttpCookieHeader(self):
+        # Passing an HTTP_COOKIE header to publish adds cookies
+        response = self.publish('/', env={
+            'HTTP_COOKIE': '$Version=1, aid=aval; $Path=/sub/folder, bid=bval'
+            })
+        self.assertEquals(response.getStatus(), 200)
+        self.failIf(response._request._cookies.has_key('aid'))
+        self.assertEquals(response._request._cookies['bid'], 'bval')
+
+    def testStickyCookies(self):
+        # Cookies should acumulate during the test
+        response = self.publish('/', env={'HTTP_COOKIE': 'aid=aval;'})
+        self.assertEquals(response.getStatus(), 200)
+
+        # Cookies are implicity passed to further requests in this test
+        response = self.publish('/getcookies')
+        self.assertEquals(response.getStatus(), 200)
+        self.assertEquals(response.getBody().strip(), 'aid=aval')
+
+        # And cookies set in responses also acumulate
+        response = self.publish('/setcookie')
+        self.assertEquals(response.getStatus(), 200)
+        response = self.publish('/getcookies')
+        self.assertEquals(response.getStatus(), 200)
+        self.assertEquals(response.getBody().strip(), 'aid=aval;bid=bval')
+
+
+class SkinsAndHTTPCaller(FunctionalTestCase):
+
+    def test_skins(self):
+        # Regression test for http://zope.org/Collectors/Zope3-dev/353
+        from zope.app.testing.functional import HTTPCaller
+        http = HTTPCaller()
+        response = http("GET /++skin++Basic HTTP/1.1\n\n")
+        self.assert_("zopetopBasic.css" in str(response))
+
+class RetryProblemFunctional(FunctionalTestCase):
+
+    def setUp(self):
+        super(RetryProblemFunctional, self).setUp()
+
+        root = self.getRootFolder()
+
+        root['fail'] = FailingKlass()
+
+        transaction.commit()
+
+    def tearDown(self):
+        root = self.getRootFolder()
+        del root['fail']
+        super(RetryProblemFunctional, self).tearDown()
+
+    def test_retryOnConflictErrorFunctional(self):
+        from zope.app.testing.functional import HTTPCaller
+
+        http = HTTPCaller()
+        response = http(r"""
+GET /@@test-conflict-raise-view.html HTTP/1.1
+Authorization: Basic mgr:mgrpw
+""")
+
+        self.assertNotEqual(response.getStatus(), 599)
+        self.assertEqual(response.getStatus(), 500)
+
+class RetryProblemBrowser(BrowserTestCase):
+    def setUp(self):
+        super(RetryProblemBrowser, self).setUp()
+
+        root = self.getRootFolder()
+
+        root['fail'] = FailingKlass()
+
+        transaction.commit()
+
+    def tearDown(self):
+        root = self.getRootFolder()
+        del root['fail']
+        super(RetryProblemBrowser, self).tearDown()
+
+    def test_retryOnConflictErrorBrowser(self):
+        response = self.publish('/@@test-conflict-raise-view.html',
+                                handle_errors=True)
+        self.assertNotEqual(response.getStatus(), 599)
+        self.assertEqual(response.getStatus(), 500)
+
+
+ftesting_zcml = os.path.join(os.path.split(zope.app.testing.__file__)[0],
+                             'ftesting.zcml')
+
+def doctest_FunctionalTestSetup_clears_global_utilities():
+    """Test that FunctionalTestSetup doesn't leave global utilities.
+
+    Leaving global IDatabase utilities makes a nice juicy memory leak.
+    See https://bugs.launchpad.net/zope3/+bug/251273
+
+        >>> setup = FunctionalTestSetup(ftesting_zcml)
+        >>> setup.setUp()
+        >>> setup.tearDown()
+
+        >>> len(getAllUtilitiesRegisteredFor(IDatabase))
+        0
+
+    Clean up:
+
+        >>> setup.tearDownCompletely()
+
+    """
+
+
+def test_suite():
+    checker = RENormalizing([
+        (re.compile(r'^HTTP/1.1 (\d{3}) .*?\n'), 'HTTP/1.1 \\1\n')
+        ])
+    SampleFunctionalTest.layer = AppTestingLayer
+    CookieFunctionalTest.layer = AppTestingLayer
+    SkinsAndHTTPCaller.layer = AppTestingLayer
+    RetryProblemFunctional.layer = AppTestingLayer
+    RetryProblemBrowser.layer = AppTestingLayer
+
+    doc_test = FunctionalDocFileSuite('doctest.txt', checker=checker)
+    doc_test.layer = AppTestingLayer
+
+    return unittest.TestSuite((
+        unittest.makeSuite(FunctionalHTTPDocTest),
+        unittest.makeSuite(AuthHeaderTestCase),
+        unittest.makeSuite(HTTPCallerTestCase),
+        unittest.makeSuite(CookieHandlerTestCase),
+        DocTestSuite(),
+        unittest.makeSuite(SampleFunctionalTest),
+        unittest.makeSuite(CookieFunctionalTest),
+        unittest.makeSuite(SkinsAndHTTPCaller),
+        unittest.makeSuite(RetryProblemFunctional),
+        unittest.makeSuite(RetryProblemBrowser),
+        doc_test,
+        ))
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')



More information about the Checkins mailing list