[Checkins] SVN: zope.publisher/tags/3.5.7/ Tag 3.5.7 release.
Tres Seaver
tseaver at palladion.com
Mon Apr 6 11:58:34 EDT 2009
Log message for revision 98933:
Tag 3.5.7 release.
Changed:
A zope.publisher/tags/3.5.7/
D zope.publisher/tags/3.5.7/CHANGES.txt
A zope.publisher/tags/3.5.7/CHANGES.txt
D zope.publisher/tags/3.5.7/buildout.cfg
A zope.publisher/tags/3.5.7/buildout.cfg
D zope.publisher/tags/3.5.7/setup.py
A zope.publisher/tags/3.5.7/setup.py
D zope.publisher/tags/3.5.7/src/zope/publisher/browser.py
A zope.publisher/tags/3.5.7/src/zope/publisher/browser.py
D zope.publisher/tags/3.5.7/src/zope/publisher/tests/test_browserrequest.py
A zope.publisher/tags/3.5.7/src/zope/publisher/tests/test_browserrequest.py
-=-
Deleted: zope.publisher/tags/3.5.7/CHANGES.txt
===================================================================
--- zope.publisher/branches/3.5/CHANGES.txt 2009-04-05 15:29:02 UTC (rev 98879)
+++ zope.publisher/tags/3.5.7/CHANGES.txt 2009-04-06 15:58:33 UTC (rev 98933)
@@ -1,139 +0,0 @@
-CHANGES
-=======
-
-3.5.6 (2009-02-14)
-------------------
-
-Bugs fixed:
-
-* An untested code path that incorrectly attempted to construct a NotFound was
- fixed, with a test.
-
-
-3.5.5 (2009-02-04)
-------------------
-
-* LP #322486: setStatus() now allows any int()-able status value.
-
-
-3.5.4 (2008-09-22)
-------------------
-
-Bugs fixed:
-
-* LP #98440: interfaces lost on retried request
-
-* LP #273296: dealing more nicely with malformed HTTP_ACCEPT_LANGUAGE headers
- within getPreferredLanguages().
-
-* LP #253362: dealing more nicely with malformed HTTP_ACCEPT_CHARSET headers
- within getPreferredCharsets().
-
-* LP #98284: Pass the ``size`` argument to readline, as the version of
- twisted used in zope.app.twisted supports it.
-
-* Fix the LP #98284 fix: do not pass ``size`` argument of None that causes
- cStringIO objects to barf with a TypeError.
-
-
-3.5.3 (2008-06-20)
-------------------
-
-Bugs fixed:
-
-* It turns out that some Web servers (Paste for example) do not send the EOF
- character after the data has been transmitted and the read() of the cached
- stream simply hangs if no expected content length has been specified.
-
-
-3.5.2 (2008-04-06)
-------------------
-
-Bugs fixed:
-
-* A previous fix to handle posting of non-form data broke handling of
- form data with extra information in the content type, as in::
-
- application/x-www-form-urlencoded; charset=UTF-8
-
-3.5.1 (2008-03-23)
-------------------
-
-Bugs fixed:
-
-* When posting non-form (and non-multipart) data, the request body was
- consumed and discarded. This makes it impossible to deal with other
- post types, like xml-rpc or json without resorting to overly complex
- "request factory" contortions.
-
-* https://bugs.launchpad.net/zope2/+bug/143873
-
- The zope.publisher.http.HTTPCharsets was confused by the Zope 2
- publisher, which gives missleading information about which headers
- it has.
-
-3.5.0 (2008-03-02)
-------------------
-
-Features added:
-
-* Added a PasteDeploy app_factory implementation. This should make
- it easier to integrate Zope 3 applications with PasteDeploy. It
- also makes it easier to control the publication used, giving far
- greater control over application policies (e.g. whether or not to
- use the ZODB).
-
-3.4.2 (2007-12-07)
-------------------
-
-* Made segmentation of URLs not strip (trailing) whitespace from path segments
- to allow URLs ending in %20 to be handled correctly. (#172742)
-
-3.4.1 (2007-09-29)
-------------------
-
-No changes since 3.4.1b2.
-
-3.4.1b2 (2007-08-02)
---------------------
-
-* zope.publisher now works on Python 2.5.
-
-* Fix a problem with request.get() when the object that's to be
- retrieved is the request itself.
-
-
-3.4.1b1 (2007-07-13)
---------------------
-
-No changes.
-
-
-3.4.0b2 (2007-07-05)
---------------------
-
-* Fix https://bugs.launchpad.net/zope3/+bug/122054:
- HTTPInputStream understands both the CONTENT_LENGTH and
- HTTP_CONTENT_LENGTH environment variables. It is also now tolerant
- of empty strings and will treat those as if the variable were
- absent.
-
-
-3.4.0b1 (2007-07-05)
---------------------
-
-* Fix caching issue. The input stream never got cached in a temp file
- because of a wrong content-length header lookup. Added CONTENT_LENGTH
- header check in addition to the previous used HTTP_CONTENT_LENGTH. The
- ``HTTP_`` prefix is sometimes added by some CGI proxies, but CONTENT_LENGTH
- is the right header info for the size.
-
-* Fix https://bugs.launchpad.net/zope3/+bug/98413:
- HTTPResponse.handleException should set the content type
-
-
-3.4.0a1 (2007-04-22)
---------------------
-
-Initial release as a separate project, corresponds to zope.publisher
-from Zope 3.4.0a1
Copied: zope.publisher/tags/3.5.7/CHANGES.txt (from rev 98932, zope.publisher/branches/3.5/CHANGES.txt)
===================================================================
--- zope.publisher/tags/3.5.7/CHANGES.txt (rev 0)
+++ zope.publisher/tags/3.5.7/CHANGES.txt 2009-04-06 15:58:33 UTC (rev 98933)
@@ -0,0 +1,148 @@
+CHANGES
+=======
+
+3.5.7 (2009-04-06)
+------------------
+
+- Move any ``QUERY_STRING`` submitted with a POST request to a new key in
+ the environment, ``X-POST-QUERY_STRING``, to prevent the Python 2.6 version
+ of ``cgi.FieldStorage`` from concatenating it onto the form data.
+
+- Pin buildout to the Zope 3.4.0 KGS index.
+
+3.5.6 (2009-02-14)
+------------------
+
+Bugs fixed:
+
+* An untested code path that incorrectly attempted to construct a NotFound was
+ fixed, with a test.
+
+
+3.5.5 (2009-02-04)
+------------------
+
+* LP #322486: setStatus() now allows any int()-able status value.
+
+
+3.5.4 (2008-09-22)
+------------------
+
+Bugs fixed:
+
+* LP #98440: interfaces lost on retried request
+
+* LP #273296: dealing more nicely with malformed HTTP_ACCEPT_LANGUAGE headers
+ within getPreferredLanguages().
+
+* LP #253362: dealing more nicely with malformed HTTP_ACCEPT_CHARSET headers
+ within getPreferredCharsets().
+
+* LP #98284: Pass the ``size`` argument to readline, as the version of
+ twisted used in zope.app.twisted supports it.
+
+* Fix the LP #98284 fix: do not pass ``size`` argument of None that causes
+ cStringIO objects to barf with a TypeError.
+
+
+3.5.3 (2008-06-20)
+------------------
+
+Bugs fixed:
+
+* It turns out that some Web servers (Paste for example) do not send the EOF
+ character after the data has been transmitted and the read() of the cached
+ stream simply hangs if no expected content length has been specified.
+
+
+3.5.2 (2008-04-06)
+------------------
+
+Bugs fixed:
+
+* A previous fix to handle posting of non-form data broke handling of
+ form data with extra information in the content type, as in::
+
+ application/x-www-form-urlencoded; charset=UTF-8
+
+3.5.1 (2008-03-23)
+------------------
+
+Bugs fixed:
+
+* When posting non-form (and non-multipart) data, the request body was
+ consumed and discarded. This makes it impossible to deal with other
+ post types, like xml-rpc or json without resorting to overly complex
+ "request factory" contortions.
+
+* https://bugs.launchpad.net/zope2/+bug/143873
+
+ The zope.publisher.http.HTTPCharsets was confused by the Zope 2
+ publisher, which gives missleading information about which headers
+ it has.
+
+3.5.0 (2008-03-02)
+------------------
+
+Features added:
+
+* Added a PasteDeploy app_factory implementation. This should make
+ it easier to integrate Zope 3 applications with PasteDeploy. It
+ also makes it easier to control the publication used, giving far
+ greater control over application policies (e.g. whether or not to
+ use the ZODB).
+
+3.4.2 (2007-12-07)
+------------------
+
+* Made segmentation of URLs not strip (trailing) whitespace from path segments
+ to allow URLs ending in %20 to be handled correctly. (#172742)
+
+3.4.1 (2007-09-29)
+------------------
+
+No changes since 3.4.1b2.
+
+3.4.1b2 (2007-08-02)
+--------------------
+
+* zope.publisher now works on Python 2.5.
+
+* Fix a problem with request.get() when the object that's to be
+ retrieved is the request itself.
+
+
+3.4.1b1 (2007-07-13)
+--------------------
+
+No changes.
+
+
+3.4.0b2 (2007-07-05)
+--------------------
+
+* Fix https://bugs.launchpad.net/zope3/+bug/122054:
+ HTTPInputStream understands both the CONTENT_LENGTH and
+ HTTP_CONTENT_LENGTH environment variables. It is also now tolerant
+ of empty strings and will treat those as if the variable were
+ absent.
+
+
+3.4.0b1 (2007-07-05)
+--------------------
+
+* Fix caching issue. The input stream never got cached in a temp file
+ because of a wrong content-length header lookup. Added CONTENT_LENGTH
+ header check in addition to the previous used HTTP_CONTENT_LENGTH. The
+ ``HTTP_`` prefix is sometimes added by some CGI proxies, but CONTENT_LENGTH
+ is the right header info for the size.
+
+* Fix https://bugs.launchpad.net/zope3/+bug/98413:
+ HTTPResponse.handleException should set the content type
+
+
+3.4.0a1 (2007-04-22)
+--------------------
+
+Initial release as a separate project, corresponds to zope.publisher
+from Zope 3.4.0a1
Deleted: zope.publisher/tags/3.5.7/buildout.cfg
===================================================================
--- zope.publisher/branches/3.5/buildout.cfg 2009-04-05 15:29:02 UTC (rev 98879)
+++ zope.publisher/tags/3.5.7/buildout.cfg 2009-04-06 15:58:33 UTC (rev 98933)
@@ -1,9 +0,0 @@
-[buildout]
-develop = .
-parts = test
-
-find-links = http://download.zope.org/distribution/
-
-[test]
-recipe = zc.recipe.testrunner
-eggs = zope.publisher [test]
Copied: zope.publisher/tags/3.5.7/buildout.cfg (from rev 98931, zope.publisher/branches/3.5/buildout.cfg)
===================================================================
--- zope.publisher/tags/3.5.7/buildout.cfg (rev 0)
+++ zope.publisher/tags/3.5.7/buildout.cfg 2009-04-06 15:58:33 UTC (rev 98933)
@@ -0,0 +1,9 @@
+[buildout]
+develop = .
+parts = test
+
+index = http://download.zope.org/zope3.4/3.4.0/index/
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = zope.publisher [test]
Deleted: zope.publisher/tags/3.5.7/setup.py
===================================================================
--- zope.publisher/branches/3.5/setup.py 2009-04-05 15:29:02 UTC (rev 98879)
+++ zope.publisher/tags/3.5.7/setup.py 2009-04-06 15:58:33 UTC (rev 98933)
@@ -1,61 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2006 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-
-import os
-from setuptools import setup, find_packages
-
-entry_points = """
-[paste.app_factory]
-main = zope.publisher.paste:Application
-
-[zope.publisher.publication_factory]
-sample = zope.publisher.tests.test_paste:SamplePublication
-"""
-
-setup(name='zope.publisher',
- version = '3.5.6',
- url='http://pypi.python.org/pypi/zope.publisher',
- license='ZPL 2.1',
- author='Zope Corporation and Contributors',
- author_email='zope-dev at zope.org',
- description="The Zope publisher publishes Python objects on the web.",
- long_description=(open('README.txt').read()
- + '\n\n'
- + open('CHANGES.txt').read()),
-
- entry_points = entry_points,
-
- packages=find_packages('src'),
- package_dir = {'': 'src'},
-
- namespace_packages=['zope',],
- install_requires=['setuptools',
- 'zope.component',
- 'zope.event',
- 'zope.exceptions',
- 'zope.i18n',
- 'zope.interface',
- 'zope.location',
- 'zope.proxy',
- 'zope.security',
- 'zope.deprecation',
- 'zope.deferredimport'],
- extras_require=dict(
- test = ['zope.testing',
- 'zope.app.testing'],
- ),
- include_package_data = True,
-
- zip_safe = False,
- )
Copied: zope.publisher/tags/3.5.7/setup.py (from rev 98931, zope.publisher/branches/3.5/setup.py)
===================================================================
--- zope.publisher/tags/3.5.7/setup.py (rev 0)
+++ zope.publisher/tags/3.5.7/setup.py 2009-04-06 15:58:33 UTC (rev 98933)
@@ -0,0 +1,60 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import os
+from setuptools import setup, find_packages
+
+entry_points = """
+[paste.app_factory]
+main = zope.publisher.paste:Application
+
+[zope.publisher.publication_factory]
+sample = zope.publisher.tests.test_paste:SamplePublication
+"""
+
+setup(name='zope.publisher',
+ version = '3.5.7',
+ url='http://pypi.python.org/pypi/zope.publisher',
+ license='ZPL 2.1',
+ author='Zope Corporation and Contributors',
+ author_email='zope-dev at zope.org',
+ description="The Zope publisher publishes Python objects on the web.",
+ long_description=(open('README.txt').read() + '\n\n'
+ + open('CHANGES.txt').read()),
+ entry_points = entry_points,
+ packages=find_packages('src'),
+ package_dir = {'': 'src'},
+ namespace_packages=['zope',],
+ include_package_data = True,
+ zip_safe = False,
+ install_requires=[
+ 'setuptools',
+ 'zope.component',
+ 'zope.event',
+ 'zope.exceptions',
+ 'zope.i18n',
+ 'zope.interface',
+ 'zope.location',
+ 'zope.proxy',
+ 'zope.security',
+ 'zope.deprecation',
+ 'zope.deferredimport',
+ ],
+ extras_require={
+ 'test': [
+ 'zope.testing',
+ 'zope.app.testing',
+ ],
+ },
+)
Deleted: zope.publisher/tags/3.5.7/src/zope/publisher/browser.py
===================================================================
--- zope.publisher/branches/3.5/src/zope/publisher/browser.py 2009-04-05 15:29:02 UTC (rev 98879)
+++ zope.publisher/tags/3.5.7/src/zope/publisher/browser.py 2009-04-06 15:58:33 UTC (rev 98933)
@@ -1,1032 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Browser-specific Publisher classes
-
-Here we define the specific 'BrowserRequest' and 'BrowserResponse' class. The
-big improvement of the 'BrowserRequest' to 'HTTPRequest' is that is can handle
-HTML form data and convert them into a Python-native format. Even file data is
-packaged into a nice, Python-friendly 'FileUpload' object.
-
-$Id$
-"""
-__docformat__ = 'restructuredtext'
-
-import re
-from types import ListType, TupleType, StringType
-from cgi import FieldStorage
-import tempfile
-
-import zope.component
-from zope.interface import implements, directlyProvides
-from zope.interface import directlyProvidedBy, providedBy
-from zope.i18n.interfaces import IUserPreferredLanguages
-from zope.i18n.interfaces import IUserPreferredCharsets
-from zope.location import Location
-
-from zope.publisher.interfaces import NotFound
-from zope.publisher.interfaces.browser import IBrowserRequest
-from zope.publisher.interfaces.browser import IDefaultBrowserLayer
-from zope.publisher.interfaces.browser import IDefaultSkin
-from zope.publisher.interfaces.browser import IBrowserApplicationRequest
-from zope.publisher.interfaces.browser import IBrowserView
-from zope.publisher.interfaces.browser import IBrowserPage
-from zope.publisher.interfaces.browser import IBrowserSkinType
-from zope.publisher.interfaces.browser import ISkinChangedEvent
-from zope.publisher.interfaces.http import IHTTPRequest
-from zope.publisher.http import HTTPRequest, HTTPResponse
-
-__ArrayTypes = (ListType, TupleType)
-
-start_of_header_search=re.compile('(<head[^>]*>)', re.I).search
-base_re_search=re.compile('(<base.*?>)',re.I).search
-isRelative = re.compile("[-_.!~*a-zA-z0-9'()@&=+$,]+(/|$)").match
-newlines = re.compile('\r\n|\n\r|\r')
-
-def is_text_html(content_type):
- return content_type.startswith('text/html')
-
-# Flag Constants
-SEQUENCE = 1
-DEFAULT = 2
-RECORD = 4
-RECORDS = 8
-REC = RECORD | RECORDS
-CONVERTED = 32
-DEFAULTABLE_METHODS = 'GET', 'POST', 'HEAD'
-
-
-def field2string(v):
- if hasattr(v, 'read'):
- return v.read()
- return str(v)
-
-def field2text(v, nl=newlines):
- return nl.sub("\n", field2string(v))
-
-def field2required(v):
- v = field2string(v)
- if not v.strip():
- raise ValueError('No input for required field<p>')
- return v
-
-def field2int(v):
- if isinstance(v, __ArrayTypes):
- return map(field2int, v)
- v = field2string(v)
- if not v:
- raise ValueError('Empty entry when <strong>integer</strong> expected')
- try:
- return int(v)
- except ValueError:
- raise ValueError("An integer was expected in the value '%s'" % v)
-
-def field2float(v):
- if isinstance(v, __ArrayTypes):
- return map(field2float, v)
- v = field2string(v)
- if not v:
- raise ValueError(
- 'Empty entry when <strong>floating-point number</strong> expected')
- try:
- return float(v)
- except ValueError:
- raise ValueError(
- "A floating-point number was expected in the value '%s'" % v)
-
-def field2long(v):
- if isinstance(v, __ArrayTypes):
- return map(field2long, v)
- v = field2string(v)
-
- # handle trailing 'L' if present.
- if v and v[-1].upper() == 'L':
- v = v[:-1]
- if not v:
- raise ValueError('Empty entry when <strong>integer</strong> expected')
- try:
- return long(v)
- except ValueError:
- raise ValueError("A long integer was expected in the value '%s'" % v)
-
-def field2tokens(v):
- return field2string(v).split()
-
-def field2lines(v):
- if isinstance(v, __ArrayTypes):
- return [str(item) for item in v]
- return field2text(v).splitlines()
-
-def field2boolean(v):
- return bool(v)
-
-type_converters = {
- 'float': field2float,
- 'int': field2int,
- 'long': field2long,
- 'string': field2string,
- 'required': field2required,
- 'tokens': field2tokens,
- 'lines': field2lines,
- 'text': field2text,
- 'boolean': field2boolean,
- }
-
-get_converter = type_converters.get
-
-def registerTypeConverter(field_type, converter, replace=False):
- """Add a custom type converter to the registry.
-
- o If 'replace' is not true, raise a KeyError if a converter is
- already registered for 'field_type'.
- """
- existing = type_converters.get(field_type)
-
- if existing is not None and not replace:
- raise KeyError('Existing converter for field_type: %s' % field_type)
-
- type_converters[field_type] = converter
-
-
-isCGI_NAME = {
- # These fields are placed in request.environ instead of request.form.
- 'SERVER_SOFTWARE' : 1,
- 'SERVER_NAME' : 1,
- 'GATEWAY_INTERFACE' : 1,
- 'SERVER_PROTOCOL' : 1,
- 'SERVER_PORT' : 1,
- 'REQUEST_METHOD' : 1,
- 'PATH_INFO' : 1,
- 'PATH_TRANSLATED' : 1,
- 'SCRIPT_NAME' : 1,
- 'QUERY_STRING' : 1,
- 'REMOTE_HOST' : 1,
- 'REMOTE_ADDR' : 1,
- 'AUTH_TYPE' : 1,
- 'REMOTE_USER' : 1,
- 'REMOTE_IDENT' : 1,
- 'CONTENT_TYPE' : 1,
- 'CONTENT_LENGTH' : 1,
- 'SERVER_URL': 1,
- }.has_key
-
-hide_key={
- 'HTTP_AUTHORIZATION':1,
- 'HTTP_CGI_AUTHORIZATION': 1,
- }.has_key
-
-class Record(object):
-
- _attrs = frozenset(('get', 'keys', 'items', 'values', 'copy',
- 'has_key', '__contains__'))
-
- def __getattr__(self, key, default=None):
- if key in self._attrs:
- return getattr(self.__dict__, key)
- raise AttributeError(key)
-
- def __getitem__(self, key):
- return self.__dict__[key]
-
- def __str__(self):
- items = self.__dict__.items()
- items.sort()
- return "{" + ", ".join(["%s: %s" % item for item in items]) + "}"
-
- def __repr__(self):
- items = self.__dict__.items()
- items.sort()
- return ("{"
- + ", ".join(["%s: %s" % (key, repr(value))
- for key, value in items]) + "}")
-
-_get_or_head = 'GET', 'HEAD'
-class BrowserRequest(HTTPRequest):
- implements(IBrowserRequest, IBrowserApplicationRequest)
-
- __slots__ = (
- '__provides__', # Allow request to directly provide interfaces
- 'form', # Form data
- 'charsets', # helper attribute
- '__meth',
- '__tuple_items',
- '__defaults',
- '__annotations__',
- )
-
- # Set this to True in a subclass to redirect GET requests when the
- # effective and actual URLs differ.
- use_redirect = False
-
- def __init__(self, body_instream, environ, response=None):
- self.form = {}
- self.charsets = None
- super(BrowserRequest, self).__init__(body_instream, environ, response)
-
-
- def _createResponse(self):
- return BrowserResponse()
-
- def _decode(self, text):
- """Try to decode the text using one of the available charsets."""
- if self.charsets is None:
- envadapter = IUserPreferredCharsets(self)
- self.charsets = envadapter.getPreferredCharsets() or ['utf-8']
- for charset in self.charsets:
- try:
- text = unicode(text, charset)
- break
- except UnicodeError:
- pass
- return text
-
- def processInputs(self):
- 'See IPublisherRequest'
-
- if self.method not in _get_or_head:
- # Process self.form if not a GET request.
- fp = self._body_instream
- if self.method == 'POST':
- content_type = self._environ.get('CONTENT_TYPE')
- if content_type and not (
- content_type.startswith('application/x-www-form-urlencoded')
- or
- content_type.startswith('multipart/')
- ):
- # for non-multi and non-form content types, FieldStorage
- # consumes the body and we have no good place to put it.
- # So we just won't call FieldStorage. :)
- return
- else:
- fp = None
-
- # If 'QUERY_STRING' is not present in self._environ
- # FieldStorage will try to get it from sys.argv[1]
- # which is not what we need.
- if 'QUERY_STRING' not in self._environ:
- self._environ['QUERY_STRING'] = ''
-
- fs = ZopeFieldStorage(fp=fp, environ=self._environ,
- keep_blank_values=1)
-
- fslist = getattr(fs, 'list', None)
- if fslist is not None:
- self.__meth = None
- self.__tuple_items = {}
- self.__defaults = {}
-
- # process all entries in the field storage (form)
- for item in fslist:
- self.__processItem(item)
-
- if self.__defaults:
- self.__insertDefaults()
-
- if self.__tuple_items:
- self.__convertToTuples()
-
- if self.__meth:
- self.setPathSuffix((self.__meth,))
-
- _typeFormat = re.compile('([a-zA-Z][a-zA-Z0-9_]+|\\.[xy])$')
-
- def __processItem(self, item):
- """Process item in the field storage."""
-
- # Check whether this field is a file upload object
- # Note: A field exists for files, even if no filename was
- # passed in and no data was uploaded. Therefore we can only
- # tell by the empty filename that no upload was made.
- key = item.name
- if (hasattr(item, 'file') and hasattr(item, 'filename')
- and hasattr(item,'headers')):
- if (item.file and
- (item.filename is not None and item.filename != ''
- # RFC 1867 says that all fields get a content-type.
- # or 'content-type' in map(lower, item.headers.keys())
- )):
- item = FileUpload(item)
- else:
- item = item.value
-
- flags = 0
- converter = None
-
- # Loop through the different types and set
- # the appropriate flags
- # Syntax: var_name:type_name
-
- # We'll search from the back to the front.
- # We'll do the search in two steps. First, we'll
- # do a string search, and then we'll check it with
- # a re search.
-
- while key:
- pos = key.rfind(":")
- if pos < 0:
- break
- match = self._typeFormat.match(key, pos + 1)
- if match is None:
- break
-
- key, type_name = key[:pos], key[pos + 1:]
-
- # find the right type converter
- c = get_converter(type_name, None)
-
- if c is not None:
- converter = c
- flags |= CONVERTED
- elif type_name == 'list':
- flags |= SEQUENCE
- elif type_name == 'tuple':
- self.__tuple_items[key] = 1
- flags |= SEQUENCE
- elif (type_name == 'method' or type_name == 'action'):
- if key:
- self.__meth = key
- else:
- self.__meth = item
- elif (type_name == 'default_method'
- or type_name == 'default_action') and not self.__meth:
- if key:
- self.__meth = key
- else:
- self.__meth = item
- elif type_name == 'default':
- flags |= DEFAULT
- elif type_name == 'record':
- flags |= RECORD
- elif type_name == 'records':
- flags |= RECORDS
- elif type_name == 'ignore_empty' and not item:
- # skip over empty fields
- return
-
- # Make it unicode if not None
- if key is not None:
- key = self._decode(key)
-
- if type(item) == StringType:
- item = self._decode(item)
-
- if flags:
- self.__setItemWithType(key, item, flags, converter)
- else:
- self.__setItemWithoutType(key, item)
-
- def __setItemWithoutType(self, key, item):
- """Set item value without explicit type."""
- form = self.form
- if key not in form:
- form[key] = item
- else:
- found = form[key]
- if isinstance(found, list):
- found.append(item)
- else:
- form[key] = [found, item]
-
- def __setItemWithType(self, key, item, flags, converter):
- """Set item value with explicit type."""
- #Split the key and its attribute
- if flags & REC:
- key, attr = self.__splitKey(key)
-
- # defer conversion
- if flags & CONVERTED:
- try:
- item = converter(item)
- except:
- if item or flags & DEFAULT or key not in self.__defaults:
- raise
- item = self.__defaults[key]
- if flags & RECORD:
- item = getattr(item, attr)
- elif flags & RECORDS:
- item = getattr(item[-1], attr)
-
- # Determine which dictionary to use
- if flags & DEFAULT:
- form = self.__defaults
- else:
- form = self.form
-
- # Insert in dictionary
- if key not in form:
- if flags & SEQUENCE:
- item = [item]
- if flags & RECORD:
- r = form[key] = Record()
- setattr(r, attr, item)
- elif flags & RECORDS:
- r = Record()
- setattr(r, attr, item)
- form[key] = [r]
- else:
- form[key] = item
- else:
- r = form[key]
- if flags & RECORD:
- if not flags & SEQUENCE:
- setattr(r, attr, item)
- else:
- if not hasattr(r, attr):
- setattr(r, attr, [item])
- else:
- getattr(r, attr).append(item)
- elif flags & RECORDS:
- last = r[-1]
- if not hasattr(last, attr):
- if flags & SEQUENCE:
- item = [item]
- setattr(last, attr, item)
- else:
- if flags & SEQUENCE:
- getattr(last, attr).append(item)
- else:
- new = Record()
- setattr(new, attr, item)
- r.append(new)
- else:
- if isinstance(r, list):
- r.append(item)
- else:
- form[key] = [r, item]
-
- def __splitKey(self, key):
- """Split the key and its attribute."""
- i = key.rfind(".")
- if i >= 0:
- return key[:i], key[i + 1:]
- return key, ""
-
- def __convertToTuples(self):
- """Convert form values to tuples."""
- form = self.form
-
- for key in self.__tuple_items:
- if key in form:
- form[key] = tuple(form[key])
- else:
- k, attr = self.__splitKey(key)
-
- # remove any type_names in the attr
- i = attr.find(":")
- if i >= 0:
- attr = attr[:i]
-
- if k in form:
- item = form[k]
- if isinstance(item, Record):
- if hasattr(item, attr):
- setattr(item, attr, tuple(getattr(item, attr)))
- else:
- for v in item:
- if hasattr(v, attr):
- setattr(v, attr, tuple(getattr(v, attr)))
-
- def __insertDefaults(self):
- """Insert defaults into form dictionary."""
- form = self.form
-
- for keys, values in self.__defaults.iteritems():
- if not keys in form:
- form[keys] = values
- else:
- item = form[keys]
- if isinstance(values, Record):
- for k, v in values.items():
- if not hasattr(item, k):
- setattr(item, k, v)
- elif isinstance(values, list):
- for val in values:
- if isinstance(val, Record):
- for k, v in val.items():
- for r in item:
- if not hasattr(r, k):
- setattr(r, k, v)
- elif not val in item:
- item.append(val)
-
- def traverse(self, obj):
- 'See IPublisherRequest'
-
- ob = super(BrowserRequest, self).traverse(obj)
- method = self.method
-
- base_needed = 0
- if self._path_suffix:
- # We had a :method variable, so we need to set the base,
- # but we don't look for default documents any more.
- base_needed = 1
- redirect = 0
- elif method in DEFAULTABLE_METHODS:
- # We need to check for default documents
- publication = self.publication
-
- nsteps = 0
- ob, add_steps = publication.getDefaultTraversal(self, ob)
- while add_steps:
- nsteps += len(add_steps)
- add_steps = list(add_steps)
- add_steps.reverse()
- self.setTraversalStack(add_steps)
- ob = super(BrowserRequest, self).traverse(ob)
- ob, add_steps = publication.getDefaultTraversal(self, ob)
-
- if nsteps != self._endswithslash:
- base_needed = 1
- redirect = self.use_redirect and method == 'GET'
-
-
- if base_needed:
- url = self.getURL()
- response = self.response
- if redirect:
- response.redirect(url)
- return ''
- elif not response.getBase():
- response.setBase(url)
-
- return ob
-
- def keys(self):
- 'See Interface.Common.Mapping.IEnumerableMapping'
- d = {}
- d.update(self._environ)
- d.update(self._cookies)
- d.update(self.form)
- return d.keys()
-
-
- def get(self, key, default=None):
- 'See Interface.Common.Mapping.IReadMapping'
- marker = object()
- result = self.form.get(key, marker)
- if result is not marker:
- return result
-
- return super(BrowserRequest, self).get(key, default)
-
-class ZopeFieldStorage(FieldStorage):
-
- def make_file(self, binary=None):
- return tempfile.NamedTemporaryFile('w+b')
-
-
-class FileUpload(object):
- '''File upload objects
-
- File upload objects are used to represent file-uploaded data.
-
- File upload objects can be used just like files.
-
- In addition, they have a 'headers' attribute that is a dictionary
- containing the file-upload headers, and a 'filename' attribute
- containing the name of the uploaded file.
- '''
-
- def __init__(self, aFieldStorage):
-
- file = aFieldStorage.file
- if hasattr(file, '__methods__'):
- methods = file.__methods__
- else:
- methods = ['close', 'fileno', 'flush', 'isatty',
- 'read', 'readline', 'readlines', 'seek',
- 'tell', 'truncate', 'write', 'writelines',
- 'name']
-
- d = self.__dict__
- for m in methods:
- if hasattr(file,m):
- d[m] = getattr(file,m)
-
- self.headers = aFieldStorage.headers
- self.filename = unicode(aFieldStorage.filename, 'UTF-8')
-
-class RedirectingBrowserRequest(BrowserRequest):
- """Browser requests that redirect when the actual and effective URLs differ
- """
-
- use_redirect = True
-
-class TestRequest(BrowserRequest):
- """Browser request with a constructor convenient for testing
- """
-
- def __init__(self, body_instream=None, environ=None, form=None,
- skin=None, **kw):
-
- _testEnv = {
- 'SERVER_URL': 'http://127.0.0.1',
- 'HTTP_HOST': '127.0.0.1',
- 'CONTENT_LENGTH': '0',
- 'GATEWAY_INTERFACE': 'TestFooInterface/1.0',
- }
-
- if environ is not None:
- _testEnv.update(environ)
-
- if kw:
- _testEnv.update(kw)
- if body_instream is None:
- from StringIO import StringIO
- body_instream = StringIO('')
-
- super(TestRequest, self).__init__(body_instream, _testEnv)
- if form:
- self.form.update(form)
-
- # Setup locale object
- langs = BrowserLanguages(self).getPreferredLanguages()
- from zope.i18n.locales import locales
- if not langs or langs[0] == '':
- self._locale = locales.getLocale(None, None, None)
- else:
- parts = (langs[0].split('-') + [None, None])[:3]
- self._locale = locales.getLocale(*parts)
-
- if skin is not None:
- directlyProvides(self, skin)
- else:
- directlyProvides(self, IDefaultBrowserLayer)
-
-
-
-class BrowserResponse(HTTPResponse):
- """Browser response
- """
-
- __slots__ = (
- '_base', # The base href
- )
-
- def _implicitResult(self, body):
- content_type = self.getHeader('content-type')
- if content_type is None:
- if isHTML(body):
- content_type = 'text/html'
- else:
- content_type = 'text/plain'
- self.setHeader('x-content-type-warning', 'guessed from content')
- self.setHeader('content-type', content_type)
-
- body, headers = super(BrowserResponse, self)._implicitResult(body)
- body = self.__insertBase(body)
- # Update the Content-Length header to account for the inserted
- # <base> tag.
- headers = [
- (name, value) for name, value in headers
- if name != 'content-length'
- ]
- headers.append(('content-length', str(len(body))))
- return body, headers
-
-
- def __insertBase(self, body):
- # Only insert a base tag if content appears to be html.
- content_type = self.getHeader('content-type', '')
- if content_type and not is_text_html(content_type):
- return body
-
- if self.getBase():
- if body:
- match = start_of_header_search(body)
- if match is not None:
- index = match.start(0) + len(match.group(0))
- ibase = base_re_search(body)
- if ibase is None:
- # Make sure the base URL is not a unicode string.
- base = str(self.getBase())
- body = ('%s\n<base href="%s" />\n%s' %
- (body[:index], base, body[index:]))
- return body
-
- def getBase(self):
- return getattr(self, '_base', '')
-
- def setBase(self, base):
- self._base = base
-
- def redirect(self, location, status=None):
- base = getattr(self, '_base', '')
- if base and isRelative(str(location)):
- l = base.rfind('/')
- if l >= 0:
- base = base[:l+1]
- else:
- base += '/'
- location = base + location
-
- # TODO: HTTP redirects must provide an absolute location, see
- # http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.30
- # So, what if location is relative and base is unknown? Uncomment
- # the following and you'll see that it actually happens.
- #
- # if isRelative(str(location)):
- # raise AssertionError('Cannot determine absolute location')
-
- return super(BrowserResponse, self).redirect(location, status)
-
- def reset(self):
- super(BrowserResponse, self).reset()
- self._base = ''
-
-def isHTML(str):
- """Try to determine whether str is HTML or not."""
- s = str.lstrip().lower()
- if s.startswith('<!doctype html'):
- return True
- if s.startswith('<html') and (s[5:6] in ' >'):
- return True
- if s.startswith('<!--'):
- idx = s.find('<html')
- return idx > 0 and (s[idx+5:idx+6] in ' >')
- else:
- return False
-
-def normalize_lang(lang):
- lang = lang.strip().lower()
- lang = lang.replace('_', '-')
- lang = lang.replace(' ', '')
- return lang
-
-class BrowserLanguages(object):
- zope.component.adapts(IHTTPRequest)
- implements(IUserPreferredLanguages)
-
- def __init__(self, request):
- self.request = request
-
- def getPreferredLanguages(self):
- '''See interface IUserPreferredLanguages'''
- accept_langs = self.request.get('HTTP_ACCEPT_LANGUAGE', '').split(',')
-
- # Normalize lang strings
- accept_langs = [normalize_lang(l) for l in accept_langs]
- # Then filter out empty ones
- accept_langs = [l for l in accept_langs if l]
-
- accepts = []
- for index, lang in enumerate(accept_langs):
- l = lang.split(';', 2)
-
- # If not supplied, quality defaults to 1...
- quality = 1.0
-
- if len(l) == 2:
- q = l[1]
- if q.startswith('q='):
- q = q.split('=', 2)[1]
- try:
- quality = float(q)
- except ValueError:
- # malformed quality value, skip it.
- continue
-
- if quality == 1.0:
- # ... but we use 1.9 - 0.001 * position to
- # keep the ordering between all items with
- # 1.0 quality, which may include items with no quality
- # defined, and items with quality defined as 1.
- quality = 1.9 - (0.001 * index)
-
- accepts.append((quality, l[0]))
-
- # Filter langs with q=0, which means
- # unwanted lang according to the spec
- # See: http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.4
- accepts = [acc for acc in accepts if acc[0]]
-
- accepts.sort()
- accepts.reverse()
-
- return [lang for quality, lang in accepts]
-
-class BrowserView(Location):
- """Browser View.
-
- >>> view = BrowserView("context", "request")
- >>> view.context
- 'context'
- >>> view.request
- 'request'
-
- >>> view.__parent__
- 'context'
- >>> view.__parent__ = "parent"
- >>> view.__parent__
- 'parent'
- """
- implements(IBrowserView)
-
- def __init__(self, context, request):
- self.context = context
- self.request = request
-
- def __getParent(self):
- return getattr(self, '_parent', self.context)
-
- def __setParent(self, parent):
- self._parent = parent
-
- __parent__ = property(__getParent, __setParent)
-
-class BrowserPage(BrowserView):
- """Browser page
-
- To create a page, which is an object that is published as a page,
- you need to provide an object that:
-
- - has a __call__ method and that
-
- - provides IBrowserPublisher, and
-
- - if ZPT is going to be used, then your object should also provide
- request and context attributes.
-
- The BrowserPage base class provides a standard constructor and a
- simple implementation of IBrowserPublisher:
-
- >>> class MyPage(BrowserPage):
- ... pass
-
- >>> request = TestRequest()
- >>> context = object()
- >>> page = MyPage(context, request)
-
- >>> from zope.publisher.interfaces.browser import IBrowserPublisher
- >>> IBrowserPublisher.providedBy(page)
- True
-
- >>> page.browserDefault(request) == (page, ())
- True
-
- >>> page.publishTraverse(request, 'bob') # doctest: +ELLIPSIS
- Traceback (most recent call last):
- ...
- NotFound: Object: <zope.publisher.browser.MyPage object at ...>, name: 'bob'
-
- >>> page.request is request
- True
-
- >>> page.context is context
- True
-
- But it doesn't supply a __call__ method:
-
- >>> page()
- Traceback (most recent call last):
- ...
- NotImplementedError: Subclasses should override __call__ to provide a response body
-
- It is the subclass' responsibility to do that.
-
- """
- implements(IBrowserPage)
-
- def browserDefault(self, request):
- return self, ()
-
- def publishTraverse(self, request, name):
- raise NotFound(self, name, request)
-
- def __call__(self, *args, **kw):
- raise NotImplementedError("Subclasses should override __call__ to "
- "provide a response body")
-
-def setDefaultSkin(request):
- """Sets the default skin for the request.
-
- The default skin is a marker interface that can be registered as an
- adapter that provides IDefaultSkin for the request type.
-
- If a default skin is not available, the default layer
- (IDefaultBrowserLayer) is used.
-
- To illustrate, we'll first use setDefaultSkin without a registered
- IDefaultSkin adapter:
-
- >>> class Request(object):
- ... implements(IBrowserRequest)
-
- >>> request = Request()
- >>> IDefaultBrowserLayer.providedBy(request)
- False
-
- >>> setDefaultSkin(request)
- >>> IDefaultBrowserLayer.providedBy(request)
- True
-
- When we register a default layer, however:
-
- >>> from zope.interface import Interface
- >>> class IMySkin(Interface):
- ... pass
- >>> zope.component.provideAdapter(IMySkin, (IBrowserRequest,),
- ... IDefaultSkin)
-
- setDefaultSkin uses the layer instead of IDefaultBrowserLayer.providedBy:
-
- >>> request = Request()
- >>> IMySkin.providedBy(request)
- False
- >>> IDefaultSkin.providedBy(request)
- False
-
- >>> setDefaultSkin(request)
-
- >>> IMySkin.providedBy(request)
- True
- >>> IDefaultBrowserLayer.providedBy(request)
- False
-
- Any interfaces that are directly provided by the request coming into this
- method are replaced by the applied layer/skin interface:
-
- >>> request = Request()
- >>> class IFoo(Interface):
- ... pass
- >>> directlyProvides(request, IFoo)
- >>> IFoo.providedBy(request)
- True
- >>> setDefaultSkin(request)
- >>> IFoo.providedBy(request)
- False
-
- """
- adapters = zope.component.getSiteManager().adapters
- skin = adapters.lookup((providedBy(request),), IDefaultSkin, '')
- if skin is not None:
- directlyProvides(request, skin)
- else:
- directlyProvides(request, IDefaultBrowserLayer)
-
-def applySkin(request, skin):
- """Change the presentation skin for this request.
-
- >>> import pprint
- >>> from zope.interface import Interface
- >>> class SkinA(Interface): pass
- >>> directlyProvides(SkinA, IBrowserSkinType)
- >>> class SkinB(Interface): pass
- >>> directlyProvides(SkinB, IBrowserSkinType)
- >>> class IRequest(Interface): pass
-
- >>> class Request(object):
- ... implements(IRequest)
-
- >>> req = Request()
-
- >>> applySkin(req, SkinA)
- >>> pprint.pprint(list(providedBy(req).interfaces()))
- [<InterfaceClass zope.publisher.browser.SkinA>,
- <InterfaceClass zope.publisher.browser.IRequest>]
-
- >>> applySkin(req, SkinB)
- >>> pprint.pprint(list(providedBy(req).interfaces()))
- [<InterfaceClass zope.publisher.browser.SkinB>,
- <InterfaceClass zope.publisher.browser.IRequest>]
-
- Changing the skin on a request triggers the ISkinChanged event:
-
- >>> import zope.component
- >>> from zope.publisher.interfaces.browser import ISkinChangedEvent
- >>> def receiveSkinEvent(event):
- ... print event.request
- >>> zope.component.provideHandler(receiveSkinEvent, (ISkinChangedEvent,))
- >>> applySkin(req, SkinA) # doctest: +ELLIPSIS
- <zope.publisher.browser.Request object at 0x...>
-
- Make sure our registrations go away again.
-
- >>> from zope.testing.cleanup import cleanUp
- >>> cleanUp()
-
- """
- # Remove all existing skin declarations (commonly the default skin).
- ifaces = [iface for iface in directlyProvidedBy(request)
- if not IBrowserSkinType.providedBy(iface)]
- # Add the new skin.
- ifaces.append(skin)
- directlyProvides(request, *ifaces)
- zope.event.notify(SkinChangedEvent(request))
-
-class SkinChangedEvent(object):
-
- zope.interface.implements(ISkinChangedEvent)
-
- def __init__(self, request):
- self.request = request
Copied: zope.publisher/tags/3.5.7/src/zope/publisher/browser.py (from rev 98932, zope.publisher/branches/3.5/src/zope/publisher/browser.py)
===================================================================
--- zope.publisher/tags/3.5.7/src/zope/publisher/browser.py (rev 0)
+++ zope.publisher/tags/3.5.7/src/zope/publisher/browser.py 2009-04-06 15:58:33 UTC (rev 98933)
@@ -0,0 +1,1037 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Browser-specific Publisher classes
+
+Here we define the specific 'BrowserRequest' and 'BrowserResponse' class. The
+big improvement of the 'BrowserRequest' to 'HTTPRequest' is that is can handle
+HTML form data and convert them into a Python-native format. Even file data is
+packaged into a nice, Python-friendly 'FileUpload' object.
+
+$Id$
+"""
+__docformat__ = 'restructuredtext'
+
+import re
+from types import ListType, TupleType, StringType
+from cgi import FieldStorage
+import tempfile
+
+import zope.component
+from zope.interface import implements, directlyProvides
+from zope.interface import directlyProvidedBy, providedBy
+from zope.i18n.interfaces import IUserPreferredLanguages
+from zope.i18n.interfaces import IUserPreferredCharsets
+from zope.location import Location
+
+from zope.publisher.interfaces import NotFound
+from zope.publisher.interfaces.browser import IBrowserRequest
+from zope.publisher.interfaces.browser import IDefaultBrowserLayer
+from zope.publisher.interfaces.browser import IDefaultSkin
+from zope.publisher.interfaces.browser import IBrowserApplicationRequest
+from zope.publisher.interfaces.browser import IBrowserView
+from zope.publisher.interfaces.browser import IBrowserPage
+from zope.publisher.interfaces.browser import IBrowserSkinType
+from zope.publisher.interfaces.browser import ISkinChangedEvent
+from zope.publisher.interfaces.http import IHTTPRequest
+from zope.publisher.http import HTTPRequest, HTTPResponse
+
+__ArrayTypes = (ListType, TupleType)
+
+start_of_header_search=re.compile('(<head[^>]*>)', re.I).search
+base_re_search=re.compile('(<base.*?>)',re.I).search
+isRelative = re.compile("[-_.!~*a-zA-z0-9'()@&=+$,]+(/|$)").match
+newlines = re.compile('\r\n|\n\r|\r')
+
+def is_text_html(content_type):
+ return content_type.startswith('text/html')
+
+# Flag Constants
+SEQUENCE = 1
+DEFAULT = 2
+RECORD = 4
+RECORDS = 8
+REC = RECORD | RECORDS
+CONVERTED = 32
+DEFAULTABLE_METHODS = 'GET', 'POST', 'HEAD'
+
+
+def field2string(v):
+ if hasattr(v, 'read'):
+ return v.read()
+ return str(v)
+
+def field2text(v, nl=newlines):
+ return nl.sub("\n", field2string(v))
+
+def field2required(v):
+ v = field2string(v)
+ if not v.strip():
+ raise ValueError('No input for required field<p>')
+ return v
+
+def field2int(v):
+ if isinstance(v, __ArrayTypes):
+ return map(field2int, v)
+ v = field2string(v)
+ if not v:
+ raise ValueError('Empty entry when <strong>integer</strong> expected')
+ try:
+ return int(v)
+ except ValueError:
+ raise ValueError("An integer was expected in the value '%s'" % v)
+
+def field2float(v):
+ if isinstance(v, __ArrayTypes):
+ return map(field2float, v)
+ v = field2string(v)
+ if not v:
+ raise ValueError(
+ 'Empty entry when <strong>floating-point number</strong> expected')
+ try:
+ return float(v)
+ except ValueError:
+ raise ValueError(
+ "A floating-point number was expected in the value '%s'" % v)
+
+def field2long(v):
+ if isinstance(v, __ArrayTypes):
+ return map(field2long, v)
+ v = field2string(v)
+
+ # handle trailing 'L' if present.
+ if v and v[-1].upper() == 'L':
+ v = v[:-1]
+ if not v:
+ raise ValueError('Empty entry when <strong>integer</strong> expected')
+ try:
+ return long(v)
+ except ValueError:
+ raise ValueError("A long integer was expected in the value '%s'" % v)
+
+def field2tokens(v):
+ return field2string(v).split()
+
+def field2lines(v):
+ if isinstance(v, __ArrayTypes):
+ return [str(item) for item in v]
+ return field2text(v).splitlines()
+
+def field2boolean(v):
+ return bool(v)
+
+type_converters = {
+ 'float': field2float,
+ 'int': field2int,
+ 'long': field2long,
+ 'string': field2string,
+ 'required': field2required,
+ 'tokens': field2tokens,
+ 'lines': field2lines,
+ 'text': field2text,
+ 'boolean': field2boolean,
+ }
+
+get_converter = type_converters.get
+
+def registerTypeConverter(field_type, converter, replace=False):
+ """Add a custom type converter to the registry.
+
+ o If 'replace' is not true, raise a KeyError if a converter is
+ already registered for 'field_type'.
+ """
+ existing = type_converters.get(field_type)
+
+ if existing is not None and not replace:
+ raise KeyError('Existing converter for field_type: %s' % field_type)
+
+ type_converters[field_type] = converter
+
+
+isCGI_NAME = {
+ # These fields are placed in request.environ instead of request.form.
+ 'SERVER_SOFTWARE' : 1,
+ 'SERVER_NAME' : 1,
+ 'GATEWAY_INTERFACE' : 1,
+ 'SERVER_PROTOCOL' : 1,
+ 'SERVER_PORT' : 1,
+ 'REQUEST_METHOD' : 1,
+ 'PATH_INFO' : 1,
+ 'PATH_TRANSLATED' : 1,
+ 'SCRIPT_NAME' : 1,
+ 'QUERY_STRING' : 1,
+ 'REMOTE_HOST' : 1,
+ 'REMOTE_ADDR' : 1,
+ 'AUTH_TYPE' : 1,
+ 'REMOTE_USER' : 1,
+ 'REMOTE_IDENT' : 1,
+ 'CONTENT_TYPE' : 1,
+ 'CONTENT_LENGTH' : 1,
+ 'SERVER_URL': 1,
+ }.has_key
+
+hide_key={
+ 'HTTP_AUTHORIZATION':1,
+ 'HTTP_CGI_AUTHORIZATION': 1,
+ }.has_key
+
+class Record(object):
+
+ _attrs = frozenset(('get', 'keys', 'items', 'values', 'copy',
+ 'has_key', '__contains__'))
+
+ def __getattr__(self, key, default=None):
+ if key in self._attrs:
+ return getattr(self.__dict__, key)
+ raise AttributeError(key)
+
+ def __getitem__(self, key):
+ return self.__dict__[key]
+
+ def __str__(self):
+ items = self.__dict__.items()
+ items.sort()
+ return "{" + ", ".join(["%s: %s" % item for item in items]) + "}"
+
+ def __repr__(self):
+ items = self.__dict__.items()
+ items.sort()
+ return ("{"
+ + ", ".join(["%s: %s" % (key, repr(value))
+ for key, value in items]) + "}")
+
+_get_or_head = 'GET', 'HEAD'
+class BrowserRequest(HTTPRequest):
+ implements(IBrowserRequest, IBrowserApplicationRequest)
+
+ __slots__ = (
+ '__provides__', # Allow request to directly provide interfaces
+ 'form', # Form data
+ 'charsets', # helper attribute
+ '__meth',
+ '__tuple_items',
+ '__defaults',
+ '__annotations__',
+ )
+
+ # Set this to True in a subclass to redirect GET requests when the
+ # effective and actual URLs differ.
+ use_redirect = False
+
+ def __init__(self, body_instream, environ, response=None):
+ self.form = {}
+ self.charsets = None
+ super(BrowserRequest, self).__init__(body_instream, environ, response)
+
+
+ def _createResponse(self):
+ return BrowserResponse()
+
+ def _decode(self, text):
+ """Try to decode the text using one of the available charsets."""
+ if self.charsets is None:
+ envadapter = IUserPreferredCharsets(self)
+ self.charsets = envadapter.getPreferredCharsets() or ['utf-8']
+ for charset in self.charsets:
+ try:
+ text = unicode(text, charset)
+ break
+ except UnicodeError:
+ pass
+ return text
+
+ def processInputs(self):
+ 'See IPublisherRequest'
+
+ if self.method not in _get_or_head:
+ # Process self.form if not a GET request.
+ fp = self._body_instream
+ if self.method == 'POST':
+ content_type = self._environ.get('CONTENT_TYPE')
+ if content_type and not (
+ content_type.startswith('application/x-www-form-urlencoded')
+ or
+ content_type.startswith('multipart/')
+ ):
+ # for non-multi and non-form content types, FieldStorage
+ # consumes the body and we have no good place to put it.
+ # So we just won't call FieldStorage. :)
+ return
+ # Python 2.6 notices QS-on-POST, which breaks BBB for us.
+ # Suppress that.
+ if 'QUERY_STRING' in self._environ:
+ env = self._environ
+ env['X-POST-QUERY_STRING'] = env.pop('QUERY_STRING')
+ else:
+ fp = None
+
+ # If 'QUERY_STRING' is not present in self._environ
+ # FieldStorage will try to get it from sys.argv[1]
+ # which is not what we need.
+ if 'QUERY_STRING' not in self._environ:
+ self._environ['QUERY_STRING'] = ''
+
+ fs = ZopeFieldStorage(fp=fp, environ=self._environ,
+ keep_blank_values=1)
+
+ fslist = getattr(fs, 'list', None)
+ if fslist is not None:
+ self.__meth = None
+ self.__tuple_items = {}
+ self.__defaults = {}
+
+ # process all entries in the field storage (form)
+ for item in fslist:
+ self.__processItem(item)
+
+ if self.__defaults:
+ self.__insertDefaults()
+
+ if self.__tuple_items:
+ self.__convertToTuples()
+
+ if self.__meth:
+ self.setPathSuffix((self.__meth,))
+
+ _typeFormat = re.compile('([a-zA-Z][a-zA-Z0-9_]+|\\.[xy])$')
+
+ def __processItem(self, item):
+ """Process item in the field storage."""
+
+ # Check whether this field is a file upload object
+ # Note: A field exists for files, even if no filename was
+ # passed in and no data was uploaded. Therefore we can only
+ # tell by the empty filename that no upload was made.
+ key = item.name
+ if (hasattr(item, 'file') and hasattr(item, 'filename')
+ and hasattr(item,'headers')):
+ if (item.file and
+ (item.filename is not None and item.filename != ''
+ # RFC 1867 says that all fields get a content-type.
+ # or 'content-type' in map(lower, item.headers.keys())
+ )):
+ item = FileUpload(item)
+ else:
+ item = item.value
+
+ flags = 0
+ converter = None
+
+ # Loop through the different types and set
+ # the appropriate flags
+ # Syntax: var_name:type_name
+
+ # We'll search from the back to the front.
+ # We'll do the search in two steps. First, we'll
+ # do a string search, and then we'll check it with
+ # a re search.
+
+ while key:
+ pos = key.rfind(":")
+ if pos < 0:
+ break
+ match = self._typeFormat.match(key, pos + 1)
+ if match is None:
+ break
+
+ key, type_name = key[:pos], key[pos + 1:]
+
+ # find the right type converter
+ c = get_converter(type_name, None)
+
+ if c is not None:
+ converter = c
+ flags |= CONVERTED
+ elif type_name == 'list':
+ flags |= SEQUENCE
+ elif type_name == 'tuple':
+ self.__tuple_items[key] = 1
+ flags |= SEQUENCE
+ elif (type_name == 'method' or type_name == 'action'):
+ if key:
+ self.__meth = key
+ else:
+ self.__meth = item
+ elif (type_name == 'default_method'
+ or type_name == 'default_action') and not self.__meth:
+ if key:
+ self.__meth = key
+ else:
+ self.__meth = item
+ elif type_name == 'default':
+ flags |= DEFAULT
+ elif type_name == 'record':
+ flags |= RECORD
+ elif type_name == 'records':
+ flags |= RECORDS
+ elif type_name == 'ignore_empty' and not item:
+ # skip over empty fields
+ return
+
+ # Make it unicode if not None
+ if key is not None:
+ key = self._decode(key)
+
+ if type(item) == StringType:
+ item = self._decode(item)
+
+ if flags:
+ self.__setItemWithType(key, item, flags, converter)
+ else:
+ self.__setItemWithoutType(key, item)
+
+ def __setItemWithoutType(self, key, item):
+ """Set item value without explicit type."""
+ form = self.form
+ if key not in form:
+ form[key] = item
+ else:
+ found = form[key]
+ if isinstance(found, list):
+ found.append(item)
+ else:
+ form[key] = [found, item]
+
+ def __setItemWithType(self, key, item, flags, converter):
+ """Set item value with explicit type."""
+ #Split the key and its attribute
+ if flags & REC:
+ key, attr = self.__splitKey(key)
+
+ # defer conversion
+ if flags & CONVERTED:
+ try:
+ item = converter(item)
+ except:
+ if item or flags & DEFAULT or key not in self.__defaults:
+ raise
+ item = self.__defaults[key]
+ if flags & RECORD:
+ item = getattr(item, attr)
+ elif flags & RECORDS:
+ item = getattr(item[-1], attr)
+
+ # Determine which dictionary to use
+ if flags & DEFAULT:
+ form = self.__defaults
+ else:
+ form = self.form
+
+ # Insert in dictionary
+ if key not in form:
+ if flags & SEQUENCE:
+ item = [item]
+ if flags & RECORD:
+ r = form[key] = Record()
+ setattr(r, attr, item)
+ elif flags & RECORDS:
+ r = Record()
+ setattr(r, attr, item)
+ form[key] = [r]
+ else:
+ form[key] = item
+ else:
+ r = form[key]
+ if flags & RECORD:
+ if not flags & SEQUENCE:
+ setattr(r, attr, item)
+ else:
+ if not hasattr(r, attr):
+ setattr(r, attr, [item])
+ else:
+ getattr(r, attr).append(item)
+ elif flags & RECORDS:
+ last = r[-1]
+ if not hasattr(last, attr):
+ if flags & SEQUENCE:
+ item = [item]
+ setattr(last, attr, item)
+ else:
+ if flags & SEQUENCE:
+ getattr(last, attr).append(item)
+ else:
+ new = Record()
+ setattr(new, attr, item)
+ r.append(new)
+ else:
+ if isinstance(r, list):
+ r.append(item)
+ else:
+ form[key] = [r, item]
+
+ def __splitKey(self, key):
+ """Split the key and its attribute."""
+ i = key.rfind(".")
+ if i >= 0:
+ return key[:i], key[i + 1:]
+ return key, ""
+
+ def __convertToTuples(self):
+ """Convert form values to tuples."""
+ form = self.form
+
+ for key in self.__tuple_items:
+ if key in form:
+ form[key] = tuple(form[key])
+ else:
+ k, attr = self.__splitKey(key)
+
+ # remove any type_names in the attr
+ i = attr.find(":")
+ if i >= 0:
+ attr = attr[:i]
+
+ if k in form:
+ item = form[k]
+ if isinstance(item, Record):
+ if hasattr(item, attr):
+ setattr(item, attr, tuple(getattr(item, attr)))
+ else:
+ for v in item:
+ if hasattr(v, attr):
+ setattr(v, attr, tuple(getattr(v, attr)))
+
+ def __insertDefaults(self):
+ """Insert defaults into form dictionary."""
+ form = self.form
+
+ for keys, values in self.__defaults.iteritems():
+ if not keys in form:
+ form[keys] = values
+ else:
+ item = form[keys]
+ if isinstance(values, Record):
+ for k, v in values.items():
+ if not hasattr(item, k):
+ setattr(item, k, v)
+ elif isinstance(values, list):
+ for val in values:
+ if isinstance(val, Record):
+ for k, v in val.items():
+ for r in item:
+ if not hasattr(r, k):
+ setattr(r, k, v)
+ elif not val in item:
+ item.append(val)
+
+ def traverse(self, obj):
+ 'See IPublisherRequest'
+
+ ob = super(BrowserRequest, self).traverse(obj)
+ method = self.method
+
+ base_needed = 0
+ if self._path_suffix:
+ # We had a :method variable, so we need to set the base,
+ # but we don't look for default documents any more.
+ base_needed = 1
+ redirect = 0
+ elif method in DEFAULTABLE_METHODS:
+ # We need to check for default documents
+ publication = self.publication
+
+ nsteps = 0
+ ob, add_steps = publication.getDefaultTraversal(self, ob)
+ while add_steps:
+ nsteps += len(add_steps)
+ add_steps = list(add_steps)
+ add_steps.reverse()
+ self.setTraversalStack(add_steps)
+ ob = super(BrowserRequest, self).traverse(ob)
+ ob, add_steps = publication.getDefaultTraversal(self, ob)
+
+ if nsteps != self._endswithslash:
+ base_needed = 1
+ redirect = self.use_redirect and method == 'GET'
+
+
+ if base_needed:
+ url = self.getURL()
+ response = self.response
+ if redirect:
+ response.redirect(url)
+ return ''
+ elif not response.getBase():
+ response.setBase(url)
+
+ return ob
+
+ def keys(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ d = {}
+ d.update(self._environ)
+ d.update(self._cookies)
+ d.update(self.form)
+ return d.keys()
+
+
+ def get(self, key, default=None):
+ 'See Interface.Common.Mapping.IReadMapping'
+ marker = object()
+ result = self.form.get(key, marker)
+ if result is not marker:
+ return result
+
+ return super(BrowserRequest, self).get(key, default)
+
+class ZopeFieldStorage(FieldStorage):
+
+ def make_file(self, binary=None):
+ return tempfile.NamedTemporaryFile('w+b')
+
+
+class FileUpload(object):
+ '''File upload objects
+
+ File upload objects are used to represent file-uploaded data.
+
+ File upload objects can be used just like files.
+
+ In addition, they have a 'headers' attribute that is a dictionary
+ containing the file-upload headers, and a 'filename' attribute
+ containing the name of the uploaded file.
+ '''
+
+ def __init__(self, aFieldStorage):
+
+ file = aFieldStorage.file
+ if hasattr(file, '__methods__'):
+ methods = file.__methods__
+ else:
+ methods = ['close', 'fileno', 'flush', 'isatty',
+ 'read', 'readline', 'readlines', 'seek',
+ 'tell', 'truncate', 'write', 'writelines',
+ 'name']
+
+ d = self.__dict__
+ for m in methods:
+ if hasattr(file,m):
+ d[m] = getattr(file,m)
+
+ self.headers = aFieldStorage.headers
+ self.filename = unicode(aFieldStorage.filename, 'UTF-8')
+
+class RedirectingBrowserRequest(BrowserRequest):
+ """Browser requests that redirect when the actual and effective URLs differ
+ """
+
+ use_redirect = True
+
+class TestRequest(BrowserRequest):
+ """Browser request with a constructor convenient for testing
+ """
+
+ def __init__(self, body_instream=None, environ=None, form=None,
+ skin=None, **kw):
+
+ _testEnv = {
+ 'SERVER_URL': 'http://127.0.0.1',
+ 'HTTP_HOST': '127.0.0.1',
+ 'CONTENT_LENGTH': '0',
+ 'GATEWAY_INTERFACE': 'TestFooInterface/1.0',
+ }
+
+ if environ is not None:
+ _testEnv.update(environ)
+
+ if kw:
+ _testEnv.update(kw)
+ if body_instream is None:
+ from StringIO import StringIO
+ body_instream = StringIO('')
+
+ super(TestRequest, self).__init__(body_instream, _testEnv)
+ if form:
+ self.form.update(form)
+
+ # Setup locale object
+ langs = BrowserLanguages(self).getPreferredLanguages()
+ from zope.i18n.locales import locales
+ if not langs or langs[0] == '':
+ self._locale = locales.getLocale(None, None, None)
+ else:
+ parts = (langs[0].split('-') + [None, None])[:3]
+ self._locale = locales.getLocale(*parts)
+
+ if skin is not None:
+ directlyProvides(self, skin)
+ else:
+ directlyProvides(self, IDefaultBrowserLayer)
+
+
+
+class BrowserResponse(HTTPResponse):
+ """Browser response
+ """
+
+ __slots__ = (
+ '_base', # The base href
+ )
+
+ def _implicitResult(self, body):
+ content_type = self.getHeader('content-type')
+ if content_type is None:
+ if isHTML(body):
+ content_type = 'text/html'
+ else:
+ content_type = 'text/plain'
+ self.setHeader('x-content-type-warning', 'guessed from content')
+ self.setHeader('content-type', content_type)
+
+ body, headers = super(BrowserResponse, self)._implicitResult(body)
+ body = self.__insertBase(body)
+ # Update the Content-Length header to account for the inserted
+ # <base> tag.
+ headers = [
+ (name, value) for name, value in headers
+ if name != 'content-length'
+ ]
+ headers.append(('content-length', str(len(body))))
+ return body, headers
+
+
+ def __insertBase(self, body):
+ # Only insert a base tag if content appears to be html.
+ content_type = self.getHeader('content-type', '')
+ if content_type and not is_text_html(content_type):
+ return body
+
+ if self.getBase():
+ if body:
+ match = start_of_header_search(body)
+ if match is not None:
+ index = match.start(0) + len(match.group(0))
+ ibase = base_re_search(body)
+ if ibase is None:
+ # Make sure the base URL is not a unicode string.
+ base = str(self.getBase())
+ body = ('%s\n<base href="%s" />\n%s' %
+ (body[:index], base, body[index:]))
+ return body
+
+ def getBase(self):
+ return getattr(self, '_base', '')
+
+ def setBase(self, base):
+ self._base = base
+
+ def redirect(self, location, status=None):
+ base = getattr(self, '_base', '')
+ if base and isRelative(str(location)):
+ l = base.rfind('/')
+ if l >= 0:
+ base = base[:l+1]
+ else:
+ base += '/'
+ location = base + location
+
+ # TODO: HTTP redirects must provide an absolute location, see
+ # http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.30
+ # So, what if location is relative and base is unknown? Uncomment
+ # the following and you'll see that it actually happens.
+ #
+ # if isRelative(str(location)):
+ # raise AssertionError('Cannot determine absolute location')
+
+ return super(BrowserResponse, self).redirect(location, status)
+
+ def reset(self):
+ super(BrowserResponse, self).reset()
+ self._base = ''
+
+def isHTML(str):
+ """Try to determine whether str is HTML or not."""
+ s = str.lstrip().lower()
+ if s.startswith('<!doctype html'):
+ return True
+ if s.startswith('<html') and (s[5:6] in ' >'):
+ return True
+ if s.startswith('<!--'):
+ idx = s.find('<html')
+ return idx > 0 and (s[idx+5:idx+6] in ' >')
+ else:
+ return False
+
+def normalize_lang(lang):
+ lang = lang.strip().lower()
+ lang = lang.replace('_', '-')
+ lang = lang.replace(' ', '')
+ return lang
+
+class BrowserLanguages(object):
+ zope.component.adapts(IHTTPRequest)
+ implements(IUserPreferredLanguages)
+
+ def __init__(self, request):
+ self.request = request
+
+ def getPreferredLanguages(self):
+ '''See interface IUserPreferredLanguages'''
+ accept_langs = self.request.get('HTTP_ACCEPT_LANGUAGE', '').split(',')
+
+ # Normalize lang strings
+ accept_langs = [normalize_lang(l) for l in accept_langs]
+ # Then filter out empty ones
+ accept_langs = [l for l in accept_langs if l]
+
+ accepts = []
+ for index, lang in enumerate(accept_langs):
+ l = lang.split(';', 2)
+
+ # If not supplied, quality defaults to 1...
+ quality = 1.0
+
+ if len(l) == 2:
+ q = l[1]
+ if q.startswith('q='):
+ q = q.split('=', 2)[1]
+ try:
+ quality = float(q)
+ except ValueError:
+ # malformed quality value, skip it.
+ continue
+
+ if quality == 1.0:
+ # ... but we use 1.9 - 0.001 * position to
+ # keep the ordering between all items with
+ # 1.0 quality, which may include items with no quality
+ # defined, and items with quality defined as 1.
+ quality = 1.9 - (0.001 * index)
+
+ accepts.append((quality, l[0]))
+
+ # Filter langs with q=0, which means
+ # unwanted lang according to the spec
+ # See: http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.4
+ accepts = [acc for acc in accepts if acc[0]]
+
+ accepts.sort()
+ accepts.reverse()
+
+ return [lang for quality, lang in accepts]
+
+class BrowserView(Location):
+ """Browser View.
+
+ >>> view = BrowserView("context", "request")
+ >>> view.context
+ 'context'
+ >>> view.request
+ 'request'
+
+ >>> view.__parent__
+ 'context'
+ >>> view.__parent__ = "parent"
+ >>> view.__parent__
+ 'parent'
+ """
+ implements(IBrowserView)
+
+ def __init__(self, context, request):
+ self.context = context
+ self.request = request
+
+ def __getParent(self):
+ return getattr(self, '_parent', self.context)
+
+ def __setParent(self, parent):
+ self._parent = parent
+
+ __parent__ = property(__getParent, __setParent)
+
+class BrowserPage(BrowserView):
+ """Browser page
+
+ To create a page, which is an object that is published as a page,
+ you need to provide an object that:
+
+ - has a __call__ method and that
+
+ - provides IBrowserPublisher, and
+
+ - if ZPT is going to be used, then your object should also provide
+ request and context attributes.
+
+ The BrowserPage base class provides a standard constructor and a
+ simple implementation of IBrowserPublisher:
+
+ >>> class MyPage(BrowserPage):
+ ... pass
+
+ >>> request = TestRequest()
+ >>> context = object()
+ >>> page = MyPage(context, request)
+
+ >>> from zope.publisher.interfaces.browser import IBrowserPublisher
+ >>> IBrowserPublisher.providedBy(page)
+ True
+
+ >>> page.browserDefault(request) == (page, ())
+ True
+
+ >>> page.publishTraverse(request, 'bob') # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ NotFound: Object: <zope.publisher.browser.MyPage object at ...>, name: 'bob'
+
+ >>> page.request is request
+ True
+
+ >>> page.context is context
+ True
+
+ But it doesn't supply a __call__ method:
+
+ >>> page()
+ Traceback (most recent call last):
+ ...
+ NotImplementedError: Subclasses should override __call__ to provide a response body
+
+ It is the subclass' responsibility to do that.
+
+ """
+ implements(IBrowserPage)
+
+ def browserDefault(self, request):
+ return self, ()
+
+ def publishTraverse(self, request, name):
+ raise NotFound(self, name, request)
+
+ def __call__(self, *args, **kw):
+ raise NotImplementedError("Subclasses should override __call__ to "
+ "provide a response body")
+
+def setDefaultSkin(request):
+ """Sets the default skin for the request.
+
+ The default skin is a marker interface that can be registered as an
+ adapter that provides IDefaultSkin for the request type.
+
+ If a default skin is not available, the default layer
+ (IDefaultBrowserLayer) is used.
+
+ To illustrate, we'll first use setDefaultSkin without a registered
+ IDefaultSkin adapter:
+
+ >>> class Request(object):
+ ... implements(IBrowserRequest)
+
+ >>> request = Request()
+ >>> IDefaultBrowserLayer.providedBy(request)
+ False
+
+ >>> setDefaultSkin(request)
+ >>> IDefaultBrowserLayer.providedBy(request)
+ True
+
+ When we register a default layer, however:
+
+ >>> from zope.interface import Interface
+ >>> class IMySkin(Interface):
+ ... pass
+ >>> zope.component.provideAdapter(IMySkin, (IBrowserRequest,),
+ ... IDefaultSkin)
+
+ setDefaultSkin uses the layer instead of IDefaultBrowserLayer.providedBy:
+
+ >>> request = Request()
+ >>> IMySkin.providedBy(request)
+ False
+ >>> IDefaultSkin.providedBy(request)
+ False
+
+ >>> setDefaultSkin(request)
+
+ >>> IMySkin.providedBy(request)
+ True
+ >>> IDefaultBrowserLayer.providedBy(request)
+ False
+
+ Any interfaces that are directly provided by the request coming into this
+ method are replaced by the applied layer/skin interface:
+
+ >>> request = Request()
+ >>> class IFoo(Interface):
+ ... pass
+ >>> directlyProvides(request, IFoo)
+ >>> IFoo.providedBy(request)
+ True
+ >>> setDefaultSkin(request)
+ >>> IFoo.providedBy(request)
+ False
+
+ """
+ adapters = zope.component.getSiteManager().adapters
+ skin = adapters.lookup((providedBy(request),), IDefaultSkin, '')
+ if skin is not None:
+ directlyProvides(request, skin)
+ else:
+ directlyProvides(request, IDefaultBrowserLayer)
+
+def applySkin(request, skin):
+ """Change the presentation skin for this request.
+
+ >>> import pprint
+ >>> from zope.interface import Interface
+ >>> class SkinA(Interface): pass
+ >>> directlyProvides(SkinA, IBrowserSkinType)
+ >>> class SkinB(Interface): pass
+ >>> directlyProvides(SkinB, IBrowserSkinType)
+ >>> class IRequest(Interface): pass
+
+ >>> class Request(object):
+ ... implements(IRequest)
+
+ >>> req = Request()
+
+ >>> applySkin(req, SkinA)
+ >>> pprint.pprint(list(providedBy(req).interfaces()))
+ [<InterfaceClass zope.publisher.browser.SkinA>,
+ <InterfaceClass zope.publisher.browser.IRequest>]
+
+ >>> applySkin(req, SkinB)
+ >>> pprint.pprint(list(providedBy(req).interfaces()))
+ [<InterfaceClass zope.publisher.browser.SkinB>,
+ <InterfaceClass zope.publisher.browser.IRequest>]
+
+ Changing the skin on a request triggers the ISkinChanged event:
+
+ >>> import zope.component
+ >>> from zope.publisher.interfaces.browser import ISkinChangedEvent
+ >>> def receiveSkinEvent(event):
+ ... print event.request
+ >>> zope.component.provideHandler(receiveSkinEvent, (ISkinChangedEvent,))
+ >>> applySkin(req, SkinA) # doctest: +ELLIPSIS
+ <zope.publisher.browser.Request object at 0x...>
+
+ Make sure our registrations go away again.
+
+ >>> from zope.testing.cleanup import cleanUp
+ >>> cleanUp()
+
+ """
+ # Remove all existing skin declarations (commonly the default skin).
+ ifaces = [iface for iface in directlyProvidedBy(request)
+ if not IBrowserSkinType.providedBy(iface)]
+ # Add the new skin.
+ ifaces.append(skin)
+ directlyProvides(request, *ifaces)
+ zope.event.notify(SkinChangedEvent(request))
+
+class SkinChangedEvent(object):
+
+ zope.interface.implements(ISkinChangedEvent)
+
+ def __init__(self, request):
+ self.request = request
Deleted: zope.publisher/tags/3.5.7/src/zope/publisher/tests/test_browserrequest.py
===================================================================
--- zope.publisher/branches/3.5/src/zope/publisher/tests/test_browserrequest.py 2009-04-05 15:29:02 UTC (rev 98879)
+++ zope.publisher/tags/3.5.7/src/zope/publisher/tests/test_browserrequest.py 2009-04-06 15:58:33 UTC (rev 98933)
@@ -1,572 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-
-import sys
-import unittest
-from StringIO import StringIO
-
-from zope.interface import implements, directlyProvides, Interface
-from zope.interface.verify import verifyObject
-
-from zope.publisher.publish import publish as publish_
-from zope.publisher.http import HTTPCharsets
-from zope.publisher.browser import BrowserRequest
-from zope.publisher.interfaces import NotFound
-
-from zope.publisher.base import DefaultPublication
-from zope.publisher.interfaces.browser import IBrowserApplicationRequest
-from zope.publisher.interfaces.browser import IBrowserRequest
-from zope.publisher.interfaces.browser import IBrowserPublication
-
-from zope.publisher.tests.test_http import HTTPTests
-from zope.publisher.tests.publication import TestPublication
-
-from zope.publisher.tests.basetestipublicationrequest \
- import BaseTestIPublicationRequest
-from zope.publisher.tests.basetestipublisherrequest \
- import BaseTestIPublisherRequest
-from zope.publisher.tests.basetestiapplicationrequest \
- import BaseTestIApplicationRequest
-
-LARGE_FILE_BODY = """-----------------------------1
-Content-Disposition: form-data; name="upload"; filename="test"
-Content-Type: text/plain
-
-Here comes some text! %s
------------------------------1--
-""" % ('test' * 1000)
-
-def publish(request):
- publish_(request, handle_errors=0)
-
-class Publication(DefaultPublication):
-
- def getDefaultTraversal(self, request, ob):
- if hasattr(ob, 'browserDefault'):
- return ob.browserDefault(request)
- return ob, ()
-
-class TestBrowserRequest(BrowserRequest, HTTPCharsets):
- """Make sure that our request also implements IHTTPCharsets, so that we do
- not need to register any adapters."""
-
- def __init__(self, *args, **kw):
- self.request = self
- BrowserRequest.__init__(self, *args, **kw)
-
-
-class BrowserTests(HTTPTests):
-
- _testEnv = {
- 'PATH_INFO': '/folder/item',
- 'QUERY_STRING': 'a=5&b:int=6',
- 'SERVER_URL': 'http://foobar.com',
- 'HTTP_HOST': 'foobar.com',
- 'CONTENT_LENGTH': '0',
- 'HTTP_AUTHORIZATION': 'Should be in accessible',
- 'GATEWAY_INTERFACE': 'TestFooInterface/1.0',
- 'HTTP_OFF_THE_WALL': "Spam 'n eggs",
- 'HTTP_ACCEPT_CHARSET': 'ISO-8859-1, UTF-8;q=0.66, UTF-16;q=0.33',
- }
-
- def setUp(self):
- super(BrowserTests, self).setUp()
-
- class AppRoot(object):
- """Required docstring for the publisher."""
-
- class Folder(object):
- """Required docstring for the publisher."""
-
- class Item(object):
- """Required docstring for the publisher."""
- def __call__(self, a, b):
- return u"%s, %s" % (`a`, `b`)
-
- class Item3(object):
- """Required docstring for the publisher."""
- def __call__(self, *args):
- return u"..."
-
- class View(object):
- """Required docstring for the publisher."""
- def browserDefault(self, request):
- return self, ['index']
-
- def index(self, a, b):
- """Required docstring for the publisher."""
- return u"%s, %s" % (`a`, `b`)
-
- class Item2(object):
- """Required docstring for the publisher."""
- view = View()
-
- def browserDefault(self, request):
- return self, ['view']
-
-
- self.app = AppRoot()
- self.app.folder = Folder()
- self.app.folder.item = Item()
- self.app.folder.item2 = Item2()
- self.app.folder.item3 = Item3()
-
- def _createRequest(self, extra_env={}, body=""):
- env = self._testEnv.copy()
- env.update(extra_env)
- if len(body):
- env['CONTENT_LENGTH'] = str(len(body))
-
- publication = Publication(self.app)
- instream = StringIO(body)
- request = TestBrowserRequest(instream, env)
- request.setPublication(publication)
- return request
-
- def testTraversalToItem(self):
- res = self._publisherResults()
- self.failUnlessEqual(
- res,
- "Status: 200 Ok\r\n"
- "Content-Length: 7\r\n"
- "Content-Type: text/plain;charset=utf-8\r\n"
- "X-Content-Type-Warning: guessed from content\r\n"
- "X-Powered-By: Zope (www.zope.org), Python (www.python.org)\r\n"
- "\r\n"
- "u'5', 6")
-
- def testNoDefault(self):
- request = self._createRequest()
- response = request.response
- publish(request)
- self.failIf(response.getBase())
-
- def testDefault(self):
- extra = {'PATH_INFO': '/folder/item2'}
- request = self._createRequest(extra)
- response = request.response
- publish(request)
- self.assertEqual(response.getBase(),
- 'http://foobar.com/folder/item2/view/index')
-
- def testDefaultPOST(self):
- extra = {'PATH_INFO': '/folder/item2', "REQUEST_METHOD": "POST"}
- request = self._createRequest(extra, body='a=5&b:int=6')
- response = request.response
- publish(request)
- self.assertEqual(response.getBase(),
- 'http://foobar.com/folder/item2/view/index')
-
- def testNoneFieldNamePost(self):
-
- """Produce a Fieldstorage with a name wich is None, this
- should be catched"""
-
- extra = {'REQUEST_METHOD':'POST',
- 'PATH_INFO': u'/',
- 'CONTENT_TYPE': 'multipart/form-data;\
- boundary=---------------------------1'}
-
- body = """-----------------------------1
- Content-Disposition: form-data; name="field.contentType"
- ...
- application/octet-stream
- -----------------------------1--
- """
- request = self._createRequest(extra,body=body)
- request.processInputs()
-
- def testFileUploadPost(self):
- """Produce a Fieldstorage with a file handle that exposes
- its filename."""
-
- extra = {'REQUEST_METHOD':'POST',
- 'PATH_INFO': u'/',
- 'CONTENT_TYPE': 'multipart/form-data;\
- boundary=---------------------------1'}
-
- request = self._createRequest(extra, body=LARGE_FILE_BODY)
- request.processInputs()
- self.assert_(request.form['upload'].name)
-
- def testDefault2(self):
- extra = {'PATH_INFO': '/folder/item2/view'}
- request = self._createRequest(extra)
- response = request.response
- publish(request)
- self.assertEqual(response.getBase(),
- 'http://foobar.com/folder/item2/view/index')
-
- def testDefault3(self):
- extra = {'PATH_INFO': '/folder/item2/view/index'}
- request = self._createRequest(extra)
- response = request.response
- publish(request)
- self.failIf(response.getBase())
-
- def testDefault4(self):
- extra = {'PATH_INFO': '/folder/item2/view/'}
- request = self._createRequest(extra)
- response = request.response
- publish(request)
- self.failIf(response.getBase())
-
- def testDefault6(self):
- extra = {'PATH_INFO': '/folder/item2/'}
- request = self._createRequest(extra)
- response = request.response
- publish(request)
- self.assertEqual(response.getBase(),
- 'http://foobar.com/folder/item2/view/index')
-
- def testBadPath(self):
- extra = {'PATH_INFO': '/folder/nothere/'}
- request = self._createRequest(extra)
- self.assertRaises(NotFound, publish, request)
-
- def testBadPath2(self):
- extra = {'PATH_INFO': '/folder%2Fitem2/'}
- request = self._createRequest(extra)
- self.assertRaises(NotFound, publish, request)
-
- def testForm(self):
- request = self._createRequest()
- publish(request)
- self.assertEqual(request.form,
- {u'a':u'5', u'b':6})
-
- def testFormNoEncodingUsesUTF8(self):
- encoded = 'K\xc3\x83\xc2\xb6hlerstra\xc3\x83\xc2\x9fe'
- extra = {
- # if nothing else is specified, form data should be
- # interpreted as UTF-8, as this stub query string is
- 'QUERY_STRING': 'a=5&b:int=6&street=' + encoded
- }
- request = self._createRequest(extra)
- # many mainstream browsers do not send HTTP_ACCEPT_CHARSET
- del request._environ['HTTP_ACCEPT_CHARSET']
- publish(request)
- self.assert_(isinstance(request.form[u'street'], unicode))
- self.assertEqual(unicode(encoded, 'utf-8'), request.form['street'])
-
- def testFormListTypes(self):
- extra = {'QUERY_STRING':'a:list=5&a:list=6&b=1'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a':[u'5',u'6'], u'b':u'1'})
-
- def testFormTupleTypes(self):
- extra = {'QUERY_STRING':'a:tuple=5&a:tuple=6&b=1'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a':(u'5',u'6'), u'b':u'1'})
-
- def testFormTupleRecordTypes(self):
- extra = {'QUERY_STRING':'a.x:tuple:record=5&a.x:tuple:record=6&b=1'}
- request = self._createRequest(extra)
- publish(request)
- keys = request.form.keys()
- keys.sort()
- self.assertEqual(keys, [u'a',u'b'])
- self.assertEqual(request.form[u'b'], u'1')
- self.assertEqual(request.form[u'a'].keys(), [u'x'])
- self.assertEqual(request.form[u'a'][u'x'], (u'5',u'6'))
- self.assertEqual(request.form[u'a'].x, (u'5',u'6'))
- self.assertEqual(str(request.form[u'a']), "{x: (u'5', u'6')}")
- self.assertEqual(repr(request.form[u'a']), "{x: (u'5', u'6')}")
-
- def testFormRecordsTypes(self):
- extra = {'QUERY_STRING':'a.x:records=5&a.x:records=6&b=1'}
- request = self._createRequest(extra)
- publish(request)
- keys = request.form.keys()
- keys.sort()
- self.assertEqual(keys, [u'a',u'b'])
- self.assertEqual(request.form[u'b'], u'1')
- self.assertEqual(len(request.form[u'a']), 2)
- self.assertEqual(request.form[u'a'][0][u'x'], u'5')
- self.assertEqual(request.form[u'a'][0].x, u'5')
- self.assertEqual(request.form[u'a'][1][u'x'], u'6')
- self.assertEqual(request.form[u'a'][1].x, u'6')
- self.assertEqual(str(request.form[u'a']), "[{x: u'5'}, {x: u'6'}]")
- self.assertEqual(repr(request.form[u'a']), "[{x: u'5'}, {x: u'6'}]")
-
- def testFormMultipleRecordsTypes(self):
- extra = {'QUERY_STRING':'a.x:records:int=5&a.y:records:int=51'
- '&a.x:records:int=6&a.y:records:int=61&b=1'}
- request = self._createRequest(extra)
- publish(request)
- keys = request.form.keys()
- keys.sort()
- self.assertEqual(keys, [u'a',u'b'])
- self.assertEqual(request.form[u'b'], u'1')
- self.assertEqual(len(request.form[u'a']), 2)
- self.assertEqual(request.form[u'a'][0][u'x'], 5)
- self.assertEqual(request.form[u'a'][0].x, 5)
- self.assertEqual(request.form[u'a'][0][u'y'], 51)
- self.assertEqual(request.form[u'a'][0].y, 51)
- self.assertEqual(request.form[u'a'][1][u'x'], 6)
- self.assertEqual(request.form[u'a'][1].x, 6)
- self.assertEqual(request.form[u'a'][1][u'y'], 61)
- self.assertEqual(request.form[u'a'][1].y, 61)
- self.assertEqual(str(request.form[u'a']),
- "[{x: 5, y: 51}, {x: 6, y: 61}]")
- self.assertEqual(repr(request.form[u'a']),
- "[{x: 5, y: 51}, {x: 6, y: 61}]")
-
- def testFormListRecordTypes(self):
- extra = {'QUERY_STRING':'a.x:list:record=5&a.x:list:record=6&b=1'}
- request = self._createRequest(extra)
- publish(request)
- keys = request.form.keys()
- keys.sort()
- self.assertEqual(keys, [u'a',u'b'])
- self.assertEqual(request.form[u'b'], u'1')
- self.assertEqual(request.form[u'a'].keys(), [u'x'])
- self.assertEqual(request.form[u'a'][u'x'], [u'5',u'6'])
- self.assertEqual(request.form[u'a'].x, [u'5',u'6'])
- self.assertEqual(str(request.form[u'a']), "{x: [u'5', u'6']}")
- self.assertEqual(repr(request.form[u'a']), "{x: [u'5', u'6']}")
-
- def testFormListTypes2(self):
- extra = {'QUERY_STRING':'a=5&a=6&b=1'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a':[u'5',u'6'], u'b':u'1'})
-
- def testFormIntTypes(self):
- extra = {'QUERY_STRING':'a:int=5&b:int=-5&c:int=0&d:int=-0'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a': 5, u'b': -5, u'c': 0, u'd': 0})
-
- extra = {'QUERY_STRING':'a:int='}
- request = self._createRequest(extra)
- self.assertRaises(ValueError, publish, request)
-
- extra = {'QUERY_STRING':'a:int=abc'}
- request = self._createRequest(extra)
- self.assertRaises(ValueError, publish, request)
-
- def testFormFloatTypes(self):
- extra = {'QUERY_STRING':'a:float=5&b:float=-5.01&c:float=0'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a': 5.0, u'b': -5.01, u'c': 0.0})
-
- extra = {'QUERY_STRING':'a:float='}
- request = self._createRequest(extra)
- self.assertRaises(ValueError, publish, request)
-
- extra = {'QUERY_STRING':'a:float=abc'}
- request = self._createRequest(extra)
- self.assertRaises(ValueError, publish, request)
-
- def testFormLongTypes(self):
- extra = {'QUERY_STRING':'a:long=99999999999999&b:long=0L'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a': 99999999999999, u'b': 0})
-
- extra = {'QUERY_STRING':'a:long='}
- request = self._createRequest(extra)
- self.assertRaises(ValueError, publish, request)
-
- extra = {'QUERY_STRING':'a:long=abc'}
- request = self._createRequest(extra)
- self.assertRaises(ValueError, publish, request)
-
- def testFormTokensTypes(self):
- extra = {'QUERY_STRING':'a:tokens=a%20b%20c%20d&b:tokens='}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a': [u'a', u'b', u'c', u'd'],
- u'b': []})
-
- def testFormStringTypes(self):
- extra = {'QUERY_STRING':'a:string=test&b:string='}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a': u'test', u'b': u''})
-
- def testFormLinesTypes(self):
- extra = {'QUERY_STRING':'a:lines=a%0ab%0ac%0ad&b:lines='}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a': [u'a', u'b', u'c', u'd'],
- u'b': []})
-
- def testFormTextTypes(self):
- extra = {'QUERY_STRING':'a:text=a%0a%0db%0d%0ac%0dd%0ae&b:text='}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a': u'a\nb\nc\nd\ne', u'b': u''})
-
- def testFormRequiredTypes(self):
- extra = {'QUERY_STRING':'a:required=%20'}
- request = self._createRequest(extra)
- self.assertRaises(ValueError, publish, request)
-
- def testFormBooleanTypes(self):
- extra = {'QUERY_STRING':'a:boolean=&b:boolean=1&c:boolean=%20'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a': False, u'b': True, u'c': True})
-
- def testFormDefaults(self):
- extra = {'QUERY_STRING':'a:default=10&a=6&b=1'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a':u'6', u'b':u'1'})
-
- def testFormDefaults2(self):
- extra = {'QUERY_STRING':'a:default=10&b=1'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a':u'10', u'b':u'1'})
-
- def testFormFieldName(self):
- extra = {'QUERY_STRING':'c+%2B%2F%3D%26c%3Aint=6',
- 'PATH_INFO': '/folder/item3/'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'c +/=&c': 6})
-
- def testFormFieldValue(self):
- extra = {'QUERY_STRING':'a=b+%2B%2F%3D%26b%3Aint',
- 'PATH_INFO': '/folder/item3/'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.form, {u'a':u'b +/=&b:int'})
-
- def testInterface(self):
- request = self._createRequest()
- verifyObject(IBrowserRequest, request)
- verifyObject(IBrowserApplicationRequest, request)
-
- def testIssue394(self):
- extra = {'PATH_INFO': '/folder/item3/'}
- request = self._createRequest(extra)
- del request._environ["QUERY_STRING"]
- argv = sys.argv
- sys.argv = [argv[0], "test"]
- try:
- publish(request)
- self.assertEqual(request.form, {})
- finally:
- sys.argv = argv
-
- def testIssue559(self):
- extra = {'QUERY_STRING': 'HTTP_REFERER=peter',
- 'HTTP_REFERER':'http://localhost/',
- 'PATH_INFO': '/folder/item3/'}
- request = self._createRequest(extra)
- publish(request)
- self.assertEqual(request.headers.get('HTTP_REFERER'), 'http://localhost/')
- self.assertEqual(request.form, {u'HTTP_REFERER': u'peter'})
-
-
- def test_post_body_not_consumed_unnecessarily(self):
- request = self._createRequest(
- dict(REQUEST_METHOD='POST',
- CONTENT_TYPE='application/x-foo',
- ),
- 'test body')
- request.processInputs()
- self.assertEqual(request.bodyStream.read(), 'test body')
-
- def test_post_body_not_necessarily(self):
- request = self._createRequest(
- dict(REQUEST_METHOD='POST',
- CONTENT_TYPE='application/x-www-form-urlencoded',
- ),
- 'x=1&y=2')
- request.processInputs()
- self.assertEqual(request.bodyStream.read(), '')
- self.assertEqual(dict(request.form), dict(x='1', y='2'))
-
- request = self._createRequest(
- dict(REQUEST_METHOD='POST',
- CONTENT_TYPE=('application/x-www-form-urlencoded'
- '; charset=UTF-8'),
- ),
- 'x=1&y=2')
- request.processInputs()
- self.assertEqual(request.bodyStream.read(), '')
- self.assertEqual(dict(request.form), dict(x='1', y='2'))
-
-class TestBrowserPublication(TestPublication):
- implements(IBrowserPublication)
-
- def getDefaultTraversal(self, request, ob):
- return ob, ()
-
-class APITests(BaseTestIPublicationRequest,
- BaseTestIApplicationRequest,
- BaseTestIPublisherRequest,
- unittest.TestCase):
-
- def _Test__new(self, environ=None, **kw):
- if environ is None:
- environ = kw
- return BrowserRequest(StringIO(''), environ)
-
- def test_IApplicationRequest_bodyStream(self):
- request = BrowserRequest(StringIO('spam'), {})
- self.assertEqual(request.bodyStream.read(), 'spam')
-
- # Needed by BaseTestIEnumerableMapping tests:
- def _IEnumerableMapping__stateDict(self):
- return {'id': 'ZopeOrg', 'title': 'Zope Community Web Site',
- 'greet': 'Welcome to the Zope Community Web site'}
-
- def _IEnumerableMapping__sample(self):
- return self._Test__new(**(self._IEnumerableMapping__stateDict()))
-
- def _IEnumerableMapping__absentKeys(self):
- return 'foo', 'bar'
-
- def test_IPublicationRequest_getPositionalArguments(self):
- self.assertEqual(self._Test__new().getPositionalArguments(), ())
-
- def test_IPublisherRequest_retry(self):
- self.assertEqual(self._Test__new().supportsRetry(), True)
-
- def test_IPublisherRequest_processInputs(self):
- self._Test__new().processInputs()
-
- def test_IPublisherRequest_traverse(self):
- request = self._Test__new()
- request.setPublication(TestBrowserPublication())
- app = request.publication.getApplication(request)
-
- request.setTraversalStack([])
- self.assertEqual(request.traverse(app).name, '')
- self.assertEqual(request._last_obj_traversed, app)
- request.setTraversalStack(['ZopeCorp'])
- self.assertEqual(request.traverse(app).name, 'ZopeCorp')
- self.assertEqual(request._last_obj_traversed, app.ZopeCorp)
- request.setTraversalStack(['Engineering', 'ZopeCorp'])
- self.assertEqual(request.traverse(app).name, 'Engineering')
- self.assertEqual(request._last_obj_traversed, app.ZopeCorp.Engineering)
-
-def test_suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(BrowserTests))
- suite.addTest(unittest.makeSuite(APITests))
- return suite
-
-
-if __name__ == '__main__':
- unittest.TextTestRunner().run(test_suite())
Copied: zope.publisher/tags/3.5.7/src/zope/publisher/tests/test_browserrequest.py (from rev 98932, zope.publisher/branches/3.5/src/zope/publisher/tests/test_browserrequest.py)
===================================================================
--- zope.publisher/tags/3.5.7/src/zope/publisher/tests/test_browserrequest.py (rev 0)
+++ zope.publisher/tags/3.5.7/src/zope/publisher/tests/test_browserrequest.py 2009-04-06 15:58:33 UTC (rev 98933)
@@ -0,0 +1,574 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import sys
+import unittest
+from StringIO import StringIO
+
+from zope.interface import implements, directlyProvides, Interface
+from zope.interface.verify import verifyObject
+
+from zope.publisher.publish import publish as publish_
+from zope.publisher.http import HTTPCharsets
+from zope.publisher.browser import BrowserRequest
+from zope.publisher.interfaces import NotFound
+
+from zope.publisher.base import DefaultPublication
+from zope.publisher.interfaces.browser import IBrowserApplicationRequest
+from zope.publisher.interfaces.browser import IBrowserRequest
+from zope.publisher.interfaces.browser import IBrowserPublication
+
+from zope.publisher.tests.test_http import HTTPTests
+from zope.publisher.tests.publication import TestPublication
+
+from zope.publisher.tests.basetestipublicationrequest \
+ import BaseTestIPublicationRequest
+from zope.publisher.tests.basetestipublisherrequest \
+ import BaseTestIPublisherRequest
+from zope.publisher.tests.basetestiapplicationrequest \
+ import BaseTestIApplicationRequest
+
+LARGE_FILE_BODY = """-----------------------------1
+Content-Disposition: form-data; name="upload"; filename="test"
+Content-Type: text/plain
+
+Here comes some text! %s
+-----------------------------1--
+""" % ('test' * 1000)
+
+def publish(request):
+ publish_(request, handle_errors=0)
+
+class Publication(DefaultPublication):
+
+ def getDefaultTraversal(self, request, ob):
+ if hasattr(ob, 'browserDefault'):
+ return ob.browserDefault(request)
+ return ob, ()
+
+class TestBrowserRequest(BrowserRequest, HTTPCharsets):
+ """Make sure that our request also implements IHTTPCharsets, so that we do
+ not need to register any adapters."""
+
+ def __init__(self, *args, **kw):
+ self.request = self
+ BrowserRequest.__init__(self, *args, **kw)
+
+
+class BrowserTests(HTTPTests):
+
+ _testEnv = {
+ 'PATH_INFO': '/folder/item',
+ 'QUERY_STRING': 'a=5&b:int=6',
+ 'SERVER_URL': 'http://foobar.com',
+ 'HTTP_HOST': 'foobar.com',
+ 'CONTENT_LENGTH': '0',
+ 'HTTP_AUTHORIZATION': 'Should be in accessible',
+ 'GATEWAY_INTERFACE': 'TestFooInterface/1.0',
+ 'HTTP_OFF_THE_WALL': "Spam 'n eggs",
+ 'HTTP_ACCEPT_CHARSET': 'ISO-8859-1, UTF-8;q=0.66, UTF-16;q=0.33',
+ }
+
+ def setUp(self):
+ super(BrowserTests, self).setUp()
+
+ class AppRoot(object):
+ """Required docstring for the publisher."""
+
+ class Folder(object):
+ """Required docstring for the publisher."""
+
+ class Item(object):
+ """Required docstring for the publisher."""
+ def __call__(self, a, b):
+ return u"%s, %s" % (`a`, `b`)
+
+ class Item3(object):
+ """Required docstring for the publisher."""
+ def __call__(self, *args):
+ return u"..."
+
+ class View(object):
+ """Required docstring for the publisher."""
+ def browserDefault(self, request):
+ return self, ['index']
+
+ def index(self, a, b):
+ """Required docstring for the publisher."""
+ return u"%s, %s" % (`a`, `b`)
+
+ class Item2(object):
+ """Required docstring for the publisher."""
+ view = View()
+
+ def browserDefault(self, request):
+ return self, ['view']
+
+
+ self.app = AppRoot()
+ self.app.folder = Folder()
+ self.app.folder.item = Item()
+ self.app.folder.item2 = Item2()
+ self.app.folder.item3 = Item3()
+
+ def _createRequest(self, extra_env={}, body=""):
+ env = self._testEnv.copy()
+ env.update(extra_env)
+ if len(body):
+ env['CONTENT_LENGTH'] = str(len(body))
+
+ publication = Publication(self.app)
+ instream = StringIO(body)
+ request = TestBrowserRequest(instream, env)
+ request.setPublication(publication)
+ return request
+
+ def testTraversalToItem(self):
+ res = self._publisherResults()
+ self.failUnlessEqual(
+ res,
+ "Status: 200 Ok\r\n"
+ "Content-Length: 7\r\n"
+ "Content-Type: text/plain;charset=utf-8\r\n"
+ "X-Content-Type-Warning: guessed from content\r\n"
+ "X-Powered-By: Zope (www.zope.org), Python (www.python.org)\r\n"
+ "\r\n"
+ "u'5', 6")
+
+ def testNoDefault(self):
+ request = self._createRequest()
+ response = request.response
+ publish(request)
+ self.failIf(response.getBase())
+
+ def testDefault(self):
+ extra = {'PATH_INFO': '/folder/item2'}
+ request = self._createRequest(extra)
+ response = request.response
+ publish(request)
+ self.assertEqual(response.getBase(),
+ 'http://foobar.com/folder/item2/view/index')
+
+ def testDefaultPOST(self):
+ extra = {'PATH_INFO': '/folder/item2', "REQUEST_METHOD": "POST"}
+ request = self._createRequest(extra, body='a=5&b:int=6')
+ response = request.response
+ publish(request)
+ self.assertEqual(response.getBase(),
+ 'http://foobar.com/folder/item2/view/index')
+
+ def testNoneFieldNamePost(self):
+
+ """Produce a Fieldstorage with a name wich is None, this
+ should be catched"""
+
+ extra = {'REQUEST_METHOD':'POST',
+ 'PATH_INFO': u'/',
+ 'CONTENT_TYPE': 'multipart/form-data;\
+ boundary=---------------------------1'}
+
+ body = """-----------------------------1
+ Content-Disposition: form-data; name="field.contentType"
+ ...
+ application/octet-stream
+ -----------------------------1--
+ """
+ request = self._createRequest(extra,body=body)
+ request.processInputs()
+
+ def testFileUploadPost(self):
+ """Produce a Fieldstorage with a file handle that exposes
+ its filename."""
+
+ extra = {'REQUEST_METHOD':'POST',
+ 'PATH_INFO': u'/',
+ 'CONTENT_TYPE': 'multipart/form-data;\
+ boundary=---------------------------1'}
+
+ request = self._createRequest(extra, body=LARGE_FILE_BODY)
+ request.processInputs()
+ self.assert_(request.form['upload'].name)
+
+ def testDefault2(self):
+ extra = {'PATH_INFO': '/folder/item2/view'}
+ request = self._createRequest(extra)
+ response = request.response
+ publish(request)
+ self.assertEqual(response.getBase(),
+ 'http://foobar.com/folder/item2/view/index')
+
+ def testDefault3(self):
+ extra = {'PATH_INFO': '/folder/item2/view/index'}
+ request = self._createRequest(extra)
+ response = request.response
+ publish(request)
+ self.failIf(response.getBase())
+
+ def testDefault4(self):
+ extra = {'PATH_INFO': '/folder/item2/view/'}
+ request = self._createRequest(extra)
+ response = request.response
+ publish(request)
+ self.failIf(response.getBase())
+
+ def testDefault6(self):
+ extra = {'PATH_INFO': '/folder/item2/'}
+ request = self._createRequest(extra)
+ response = request.response
+ publish(request)
+ self.assertEqual(response.getBase(),
+ 'http://foobar.com/folder/item2/view/index')
+
+ def testBadPath(self):
+ extra = {'PATH_INFO': '/folder/nothere/'}
+ request = self._createRequest(extra)
+ self.assertRaises(NotFound, publish, request)
+
+ def testBadPath2(self):
+ extra = {'PATH_INFO': '/folder%2Fitem2/'}
+ request = self._createRequest(extra)
+ self.assertRaises(NotFound, publish, request)
+
+ def testForm(self):
+ request = self._createRequest()
+ publish(request)
+ self.assertEqual(request.form,
+ {u'a':u'5', u'b':6})
+
+ def testFormNoEncodingUsesUTF8(self):
+ encoded = 'K\xc3\x83\xc2\xb6hlerstra\xc3\x83\xc2\x9fe'
+ extra = {
+ # if nothing else is specified, form data should be
+ # interpreted as UTF-8, as this stub query string is
+ 'QUERY_STRING': 'a=5&b:int=6&street=' + encoded
+ }
+ request = self._createRequest(extra)
+ # many mainstream browsers do not send HTTP_ACCEPT_CHARSET
+ del request._environ['HTTP_ACCEPT_CHARSET']
+ publish(request)
+ self.assert_(isinstance(request.form[u'street'], unicode))
+ self.assertEqual(unicode(encoded, 'utf-8'), request.form['street'])
+
+ def testFormListTypes(self):
+ extra = {'QUERY_STRING':'a:list=5&a:list=6&b=1'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a':[u'5',u'6'], u'b':u'1'})
+
+ def testFormTupleTypes(self):
+ extra = {'QUERY_STRING':'a:tuple=5&a:tuple=6&b=1'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a':(u'5',u'6'), u'b':u'1'})
+
+ def testFormTupleRecordTypes(self):
+ extra = {'QUERY_STRING':'a.x:tuple:record=5&a.x:tuple:record=6&b=1'}
+ request = self._createRequest(extra)
+ publish(request)
+ keys = request.form.keys()
+ keys.sort()
+ self.assertEqual(keys, [u'a',u'b'])
+ self.assertEqual(request.form[u'b'], u'1')
+ self.assertEqual(request.form[u'a'].keys(), [u'x'])
+ self.assertEqual(request.form[u'a'][u'x'], (u'5',u'6'))
+ self.assertEqual(request.form[u'a'].x, (u'5',u'6'))
+ self.assertEqual(str(request.form[u'a']), "{x: (u'5', u'6')}")
+ self.assertEqual(repr(request.form[u'a']), "{x: (u'5', u'6')}")
+
+ def testFormRecordsTypes(self):
+ extra = {'QUERY_STRING':'a.x:records=5&a.x:records=6&b=1'}
+ request = self._createRequest(extra)
+ publish(request)
+ keys = request.form.keys()
+ keys.sort()
+ self.assertEqual(keys, [u'a',u'b'])
+ self.assertEqual(request.form[u'b'], u'1')
+ self.assertEqual(len(request.form[u'a']), 2)
+ self.assertEqual(request.form[u'a'][0][u'x'], u'5')
+ self.assertEqual(request.form[u'a'][0].x, u'5')
+ self.assertEqual(request.form[u'a'][1][u'x'], u'6')
+ self.assertEqual(request.form[u'a'][1].x, u'6')
+ self.assertEqual(str(request.form[u'a']), "[{x: u'5'}, {x: u'6'}]")
+ self.assertEqual(repr(request.form[u'a']), "[{x: u'5'}, {x: u'6'}]")
+
+ def testFormMultipleRecordsTypes(self):
+ extra = {'QUERY_STRING':'a.x:records:int=5&a.y:records:int=51'
+ '&a.x:records:int=6&a.y:records:int=61&b=1'}
+ request = self._createRequest(extra)
+ publish(request)
+ keys = request.form.keys()
+ keys.sort()
+ self.assertEqual(keys, [u'a',u'b'])
+ self.assertEqual(request.form[u'b'], u'1')
+ self.assertEqual(len(request.form[u'a']), 2)
+ self.assertEqual(request.form[u'a'][0][u'x'], 5)
+ self.assertEqual(request.form[u'a'][0].x, 5)
+ self.assertEqual(request.form[u'a'][0][u'y'], 51)
+ self.assertEqual(request.form[u'a'][0].y, 51)
+ self.assertEqual(request.form[u'a'][1][u'x'], 6)
+ self.assertEqual(request.form[u'a'][1].x, 6)
+ self.assertEqual(request.form[u'a'][1][u'y'], 61)
+ self.assertEqual(request.form[u'a'][1].y, 61)
+ self.assertEqual(str(request.form[u'a']),
+ "[{x: 5, y: 51}, {x: 6, y: 61}]")
+ self.assertEqual(repr(request.form[u'a']),
+ "[{x: 5, y: 51}, {x: 6, y: 61}]")
+
+ def testFormListRecordTypes(self):
+ extra = {'QUERY_STRING':'a.x:list:record=5&a.x:list:record=6&b=1'}
+ request = self._createRequest(extra)
+ publish(request)
+ keys = request.form.keys()
+ keys.sort()
+ self.assertEqual(keys, [u'a',u'b'])
+ self.assertEqual(request.form[u'b'], u'1')
+ self.assertEqual(request.form[u'a'].keys(), [u'x'])
+ self.assertEqual(request.form[u'a'][u'x'], [u'5',u'6'])
+ self.assertEqual(request.form[u'a'].x, [u'5',u'6'])
+ self.assertEqual(str(request.form[u'a']), "{x: [u'5', u'6']}")
+ self.assertEqual(repr(request.form[u'a']), "{x: [u'5', u'6']}")
+
+ def testFormListTypes2(self):
+ extra = {'QUERY_STRING':'a=5&a=6&b=1'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a':[u'5',u'6'], u'b':u'1'})
+
+ def testFormIntTypes(self):
+ extra = {'QUERY_STRING':'a:int=5&b:int=-5&c:int=0&d:int=-0'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a': 5, u'b': -5, u'c': 0, u'd': 0})
+
+ extra = {'QUERY_STRING':'a:int='}
+ request = self._createRequest(extra)
+ self.assertRaises(ValueError, publish, request)
+
+ extra = {'QUERY_STRING':'a:int=abc'}
+ request = self._createRequest(extra)
+ self.assertRaises(ValueError, publish, request)
+
+ def testFormFloatTypes(self):
+ extra = {'QUERY_STRING':'a:float=5&b:float=-5.01&c:float=0'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a': 5.0, u'b': -5.01, u'c': 0.0})
+
+ extra = {'QUERY_STRING':'a:float='}
+ request = self._createRequest(extra)
+ self.assertRaises(ValueError, publish, request)
+
+ extra = {'QUERY_STRING':'a:float=abc'}
+ request = self._createRequest(extra)
+ self.assertRaises(ValueError, publish, request)
+
+ def testFormLongTypes(self):
+ extra = {'QUERY_STRING':'a:long=99999999999999&b:long=0L'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a': 99999999999999, u'b': 0})
+
+ extra = {'QUERY_STRING':'a:long='}
+ request = self._createRequest(extra)
+ self.assertRaises(ValueError, publish, request)
+
+ extra = {'QUERY_STRING':'a:long=abc'}
+ request = self._createRequest(extra)
+ self.assertRaises(ValueError, publish, request)
+
+ def testFormTokensTypes(self):
+ extra = {'QUERY_STRING':'a:tokens=a%20b%20c%20d&b:tokens='}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a': [u'a', u'b', u'c', u'd'],
+ u'b': []})
+
+ def testFormStringTypes(self):
+ extra = {'QUERY_STRING':'a:string=test&b:string='}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a': u'test', u'b': u''})
+
+ def testFormLinesTypes(self):
+ extra = {'QUERY_STRING':'a:lines=a%0ab%0ac%0ad&b:lines='}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a': [u'a', u'b', u'c', u'd'],
+ u'b': []})
+
+ def testFormTextTypes(self):
+ extra = {'QUERY_STRING':'a:text=a%0a%0db%0d%0ac%0dd%0ae&b:text='}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a': u'a\nb\nc\nd\ne', u'b': u''})
+
+ def testFormRequiredTypes(self):
+ extra = {'QUERY_STRING':'a:required=%20'}
+ request = self._createRequest(extra)
+ self.assertRaises(ValueError, publish, request)
+
+ def testFormBooleanTypes(self):
+ extra = {'QUERY_STRING':'a:boolean=&b:boolean=1&c:boolean=%20'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a': False, u'b': True, u'c': True})
+
+ def testFormDefaults(self):
+ extra = {'QUERY_STRING':'a:default=10&a=6&b=1'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a':u'6', u'b':u'1'})
+
+ def testFormDefaults2(self):
+ extra = {'QUERY_STRING':'a:default=10&b=1'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a':u'10', u'b':u'1'})
+
+ def testFormFieldName(self):
+ extra = {'QUERY_STRING':'c+%2B%2F%3D%26c%3Aint=6',
+ 'PATH_INFO': '/folder/item3/'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'c +/=&c': 6})
+
+ def testFormFieldValue(self):
+ extra = {'QUERY_STRING':'a=b+%2B%2F%3D%26b%3Aint',
+ 'PATH_INFO': '/folder/item3/'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.form, {u'a':u'b +/=&b:int'})
+
+ def testInterface(self):
+ request = self._createRequest()
+ verifyObject(IBrowserRequest, request)
+ verifyObject(IBrowserApplicationRequest, request)
+
+ def testIssue394(self):
+ extra = {'PATH_INFO': '/folder/item3/'}
+ request = self._createRequest(extra)
+ del request._environ["QUERY_STRING"]
+ argv = sys.argv
+ sys.argv = [argv[0], "test"]
+ try:
+ publish(request)
+ self.assertEqual(request.form, {})
+ finally:
+ sys.argv = argv
+
+ def testIssue559(self):
+ extra = {'QUERY_STRING': 'HTTP_REFERER=peter',
+ 'HTTP_REFERER':'http://localhost/',
+ 'PATH_INFO': '/folder/item3/'}
+ request = self._createRequest(extra)
+ publish(request)
+ self.assertEqual(request.headers.get('HTTP_REFERER'), 'http://localhost/')
+ self.assertEqual(request.form, {u'HTTP_REFERER': u'peter'})
+
+
+ def test_post_body_not_consumed_unnecessarily(self):
+ request = self._createRequest(
+ dict(REQUEST_METHOD='POST',
+ CONTENT_TYPE='application/x-foo',
+ ),
+ 'test body')
+ request.processInputs()
+ self.assertEqual(request.bodyStream.read(), 'test body')
+
+ def test_post_body_not_necessarily(self):
+ request = self._createRequest(
+ dict(REQUEST_METHOD='POST',
+ CONTENT_TYPE='application/x-www-form-urlencoded',
+ ),
+ 'x=1&y=2')
+ request.processInputs()
+ self.assertEqual(request.bodyStream.read(), '')
+ self.assertEqual(dict(request.form), dict(x='1', y='2'))
+
+ request = self._createRequest(
+ dict(REQUEST_METHOD='POST',
+ CONTENT_TYPE=('application/x-www-form-urlencoded'
+ '; charset=UTF-8'),
+ ),
+ 'x=1&y=2')
+ request.processInputs()
+ self.assertEqual(request.bodyStream.read(), '')
+ self.assertEqual(dict(request.form), dict(x='1', y='2'))
+ self.assertEqual(request.environment['X-POST-QUERY_STRING'],
+ 'a=5&b:int=6')
+
+class TestBrowserPublication(TestPublication):
+ implements(IBrowserPublication)
+
+ def getDefaultTraversal(self, request, ob):
+ return ob, ()
+
+class APITests(BaseTestIPublicationRequest,
+ BaseTestIApplicationRequest,
+ BaseTestIPublisherRequest,
+ unittest.TestCase):
+
+ def _Test__new(self, environ=None, **kw):
+ if environ is None:
+ environ = kw
+ return BrowserRequest(StringIO(''), environ)
+
+ def test_IApplicationRequest_bodyStream(self):
+ request = BrowserRequest(StringIO('spam'), {})
+ self.assertEqual(request.bodyStream.read(), 'spam')
+
+ # Needed by BaseTestIEnumerableMapping tests:
+ def _IEnumerableMapping__stateDict(self):
+ return {'id': 'ZopeOrg', 'title': 'Zope Community Web Site',
+ 'greet': 'Welcome to the Zope Community Web site'}
+
+ def _IEnumerableMapping__sample(self):
+ return self._Test__new(**(self._IEnumerableMapping__stateDict()))
+
+ def _IEnumerableMapping__absentKeys(self):
+ return 'foo', 'bar'
+
+ def test_IPublicationRequest_getPositionalArguments(self):
+ self.assertEqual(self._Test__new().getPositionalArguments(), ())
+
+ def test_IPublisherRequest_retry(self):
+ self.assertEqual(self._Test__new().supportsRetry(), True)
+
+ def test_IPublisherRequest_processInputs(self):
+ self._Test__new().processInputs()
+
+ def test_IPublisherRequest_traverse(self):
+ request = self._Test__new()
+ request.setPublication(TestBrowserPublication())
+ app = request.publication.getApplication(request)
+
+ request.setTraversalStack([])
+ self.assertEqual(request.traverse(app).name, '')
+ self.assertEqual(request._last_obj_traversed, app)
+ request.setTraversalStack(['ZopeCorp'])
+ self.assertEqual(request.traverse(app).name, 'ZopeCorp')
+ self.assertEqual(request._last_obj_traversed, app.ZopeCorp)
+ request.setTraversalStack(['Engineering', 'ZopeCorp'])
+ self.assertEqual(request.traverse(app).name, 'Engineering')
+ self.assertEqual(request._last_obj_traversed, app.ZopeCorp.Engineering)
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(BrowserTests))
+ suite.addTest(unittest.makeSuite(APITests))
+ return suite
+
+
+if __name__ == '__main__':
+ unittest.TextTestRunner().run(test_suite())
More information about the Checkins
mailing list