[Checkins] SVN: bozo.reuse/branches/dev/src/bozo/reuse/ Implemented request.

Jim Fulton jim at zope.com
Sun Jun 7 08:34:46 EDT 2009


Log message for revision 100682:
  Implemented request.
  

Changed:
  U   bozo.reuse/branches/dev/src/bozo/reuse/__init__.py
  D   bozo.reuse/branches/dev/src/bozo/reuse/application.py
  A   bozo.reuse/branches/dev/src/bozo/reuse/interfaces.py
  A   bozo.reuse/branches/dev/src/bozo/reuse/request.py
  U   bozo.reuse/branches/dev/src/bozo/reuse/tests.py

-=-
Modified: bozo.reuse/branches/dev/src/bozo/reuse/__init__.py
===================================================================
--- bozo.reuse/branches/dev/src/bozo/reuse/__init__.py	2009-06-07 09:44:12 UTC (rev 100681)
+++ bozo.reuse/branches/dev/src/bozo/reuse/__init__.py	2009-06-07 12:34:46 UTC (rev 100682)
@@ -1 +1,16 @@
+##############################################################################
 #
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from bozo.reuse import interfaces # Gaaah, circular imports
+from bozo.reuse.request import Request

Deleted: bozo.reuse/branches/dev/src/bozo/reuse/application.py
===================================================================
--- bozo.reuse/branches/dev/src/bozo/reuse/application.py	2009-06-07 09:44:12 UTC (rev 100681)
+++ bozo.reuse/branches/dev/src/bozo/reuse/application.py	2009-06-07 12:34:46 UTC (rev 100682)
@@ -1,238 +0,0 @@
-##############################################################################
-#
-# Copyright Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-
-import bobo
-import zope.event
-import zope.i18n.interfaces
-import zope.publisher.browser
-import zope.publisher.http
-import webob
-import webob.multidict
-
-class BozoMultiDict:
-
-    def __init__(self, data):
-        self.data = data
-
-    def __getitem__(self, k):
-        v = self.data[k]
-        if isinstance(v, list):
-            return v[-1]
-        return v
-
-    def getall(self, name):
-        value = self.data.get(name)
-        if value is None:
-            return []
-        if isinstance(value, list):
-            return value
-        return [value]
-
-    def getone(self, name):
-        value = self.data.get(name)
-        if value is None:
-            raise KeyError(name)
-        if isinstance(value, list):
-            if len(value) != 1:
-                raise KeyError(name)
-            value = value[0]
-        return value
-
-    def mixed(self):
-        return self.data
-
-    def dict_of_lists(self):
-        return dict((k, self.getall(k)) for k in self.data)
-
-    def __contains__(self, k):
-        return k in self.data
-
-    def copy(self):
-        return self.__class__(self.data.copy())
-
-    def __len__(self):
-        return len(self.data)
-
-    def iteritems(self):
-        data = self.data
-        for k in sorted(data):
-            v = data[k]
-            if isinstance(v, list):
-                for x in v:
-                    yield k, x
-            else:
-                yield k, v
-
-    def items(self):
-        return list(self.iteritems())
-
-    def iterkeys(self):
-        return (item[0] for item in self.iteritems())
-
-    __iter__ = iterkeys
-
-    def keys(self):
-        return list(self.iterkeys())
-
-    def itervalues(self):
-        return (item[1] for item in self.iteritems())
-
-    def values(self):
-        return list(self.itervalues())
-
-    def __repr__(self):
-        return "BozoMultiDict(%r)" % self.items()
-
-class Request(zope.publisher.browser.BrowserRequest):
-
-    __slots__ = ('_webob', '_form', '_charsets')
-
-    def __init__(self, *args):
-        zope.publisher.browser.BrowserRequest.__init__(self, *args)
-        self.processInputs()
-
-    @classmethod
-    def blank(class_, *args, **kw):
-        environ = webob.Request.blank(*args, **kw).environ
-        return class_(environ['wsgi.input'], environ)
-
-    @property
-    def charsets(self):
-        try:
-            return self._charsets
-        except AttributeError:
-            pass
-
-        adapter = zope.i18n.interfaces.IUserPreferredCharsets(self, None)
-        if adapter is None:
-            adapter = zope.publisher.http.HTTPCharsets(self)
-
-        self._charsets = adapter.getPreferredCharsets() or ['utf-8']
-
-        return self._charsets
-
-    @property
-    def charset(self):
-        return self.charsets[0]
-
-    @property
-    def environ(self):
-        return self._environ
-
-    @property
-    def webob(self):
-        try:
-            return self._webob
-        except AttributeError:
-            w = self._webob = webob.Request(self._environ)
-            if w.charset is None:
-                w.charset = self.charset
-            return w
-
-    @property
-    def headers(self):
-        return self.webob.headers
-
-    def __getattr__(self, name):
-        if name == '_webob':
-            raise AttributeError(name)
-        return getattr(self.webob, name)
-
-    def __setattr__(self, name, value):
-        try:
-            object.__setattr__(self, name, value)
-        except AttributeError:
-            if name == 'charsets':
-                if value is None:
-                    return
-                raise
-            setattr(self.webob, name, value)
-
-    def __delattr__(self, name):
-        try:
-            object.__delattr__(self, name)
-        except AttributeError:
-            delattr(self.webob, name)
-
-    @property
-    def body_file(self):
-        return self.bodyStream
-
-    @property
-    def body(self):
-        stream = self.bodyStream
-        stream.seek(0)
-        return stream.read()
-
-    @property
-    def params(self):
-        return BozoMultiDict(self.form)
-
-    @property
-    def params(self):
-        return BozoMultiDict(self.form)
-
-    GET = params
-
-    @property
-    def POST(self):
-        if self.method in ('POST', 'PUT'):
-            content_type = self._environ.get('CONTENT_TYPE')
-            if content_type and (
-                content_type.startswith('application/x-www-form-urlencoded')
-                or
-                content_type.startswith('multipart/')
-                ):
-                return self.params
-        return webob.multidict.NoVars('Not a POST request')
-
-
-class Application(bobo.Application):
-
-    def __call__(self, environ, start_response):
-        request = Request(environ['wsgi.input'], environ)
-        zope.event.notify(BeginRequest(request))
-        try:
-            return self.bobo_response(
-                request, request.path_info, request.method
-                )(environ, start_response)
-        finally:
-            zope.event.notify(EndRequest(request))
-
-    def build_response(self, request, method, data):
-        content_type = data.content_type
-        response = request.response
-        if content_type:
-            response.setHeader('Content-Type', content_type)
-        for name, value in data.headers:
-            response.setHeader(name, value)
-        result = data.body
-        if result is not response:
-            response.setResult(result)
-
-        def wsgiresponse(environ, start_response):
-            start_response(response.getStatusString(), response.getHeaders())
-            return response.consumeBodyIter()
-
-        return wsgiresponse
-
-class RequestEvent:
-    def __init__(self, request):
-        self.request = request
-
-class BeginRequest(RequestEvent):
-    "A request has begun"
-
-class EndRequest(RequestEvent):
-    "A request has finished processing"

Added: bozo.reuse/branches/dev/src/bozo/reuse/interfaces.py
===================================================================
--- bozo.reuse/branches/dev/src/bozo/reuse/interfaces.py	                        (rev 0)
+++ bozo.reuse/branches/dev/src/bozo/reuse/interfaces.py	2009-06-07 12:34:46 UTC (rev 100682)
@@ -0,0 +1,33 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import bobo
+import zope.interface
+import zope.publisher.interfaces.browser
+
+class IResource(zope.interface.Interface):
+
+    def bobo_response(request, path, method):
+        """Return a response for the given request, path and method
+
+        The response is a callable with the same signature as a WSGI
+        application.
+        """
+
+# XXX bobo should expose somethinbg we can use:
+zope.interface.classImplements(bobo._Handler, IResource)
+
+class IRequest(zope.publisher.interfaces.browser.IBrowserRequest):
+    """Browser requests that also provide the webob.Request API
+    """


Property changes on: bozo.reuse/branches/dev/src/bozo/reuse/interfaces.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: bozo.reuse/branches/dev/src/bozo/reuse/request.py
===================================================================
--- bozo.reuse/branches/dev/src/bozo/reuse/request.py	                        (rev 0)
+++ bozo.reuse/branches/dev/src/bozo/reuse/request.py	2009-06-07 12:34:46 UTC (rev 100682)
@@ -0,0 +1,244 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import interfaces
+import webob
+import webob.multidict
+import zope.i18n.interfaces
+import zope.interface
+import zope.publisher.browser
+import zope.publisher.http
+
+class BozoMultiDict:
+
+    def __init__(self, data):
+        self.data = data
+
+    def __getitem__(self, k):
+        v = self.data[k]
+        if isinstance(v, list):
+            return v[-1]
+        return v
+
+    def getall(self, name):
+        value = self.data.get(name)
+        if value is None:
+            return []
+        if isinstance(value, list):
+            return value
+        return [value]
+
+    def getone(self, name):
+        value = self.data.get(name)
+        if value is None:
+            raise KeyError(name)
+        if isinstance(value, list):
+            if len(value) != 1:
+                raise KeyError(name)
+            value = value[0]
+        return value
+
+    def mixed(self):
+        return self.data
+
+    def dict_of_lists(self):
+        return dict((k, self.getall(k)) for k in self.data)
+
+    def __contains__(self, k):
+        return k in self.data
+
+    def copy(self):
+        return self.__class__(self.data.copy())
+
+    def __len__(self):
+        return len(self.data)
+
+    def iteritems(self):
+        data = self.data
+        for k in sorted(data):
+            v = data[k]
+            if isinstance(v, list):
+                for x in v:
+                    yield k, x
+            else:
+                yield k, v
+
+    def items(self):
+        return list(self.iteritems())
+
+    def iterkeys(self):
+        return (item[0] for item in self.iteritems())
+
+    __iter__ = iterkeys
+
+    def keys(self):
+        return list(self.iterkeys())
+
+    def itervalues(self):
+        return (item[1] for item in self.iteritems())
+
+    def values(self):
+        return list(self.itervalues())
+
+    def __repr__(self):
+        return "BozoMultiDict(%r)" % self.items()
+
+class Request(zope.publisher.browser.BrowserRequest):
+
+    zope.interface.implements(interfaces.IRequest)
+
+    __slots__ = (
+        '_webob', '_form', '_charsets',
+
+        # We have to repeat the __provides__ slot here to deal with
+        # conflicting magic between a descriptor inserted by the interface
+        # machinery and slots.  A consequence is that any subclass of Request
+        # also has to repeat this slot. Obviously, this is unfortunate. :(
+        '__provides__',
+        )
+
+    def __init__(self, *args):
+        zope.publisher.browser.BrowserRequest.__init__(self, *args)
+        del self._form
+
+    # Form handling is a bit tricky
+    # Goals:
+    # - be backward compat with BrowserRequest and with zope.publisher
+    #   - Can call processInputs after the request is created
+    # - Don't require that processInputs be (externally) called at all.
+    # - Work with the BrowserRequest tests, which assume that form processing
+    #   doesn't happen until processInputs is called.
+    #
+    # Extra credit:
+    # - Don't do form processing unless someone asks for form data.
+    #
+    # Solution:
+    # - After creation:
+    #   processInputs will be called either when called explicitly, or
+    #   the first time someone gets the form attr
+
+    @apply
+    def form():
+        def get_form(self):
+            try:
+                return self._form
+            except AttributeError:
+                self._form = {}
+                self.processInputs()
+                return self._form
+
+        def set_form(self, form):
+            assert not form
+            self._form = form
+
+        return property(get_form, set_form)
+
+    def processInputs(self):
+        self._form = {}
+        super(Request, self).processInputs()
+
+    @classmethod
+    def blank(class_, *args, **kw):
+        environ = webob.Request.blank(*args, **kw).environ
+        return class_(environ['wsgi.input'], environ)
+
+    @property
+    def charsets(self):
+        try:
+            return self._charsets
+        except AttributeError:
+            pass
+
+        adapter = zope.i18n.interfaces.IUserPreferredCharsets(self, None)
+        if adapter is None:
+            adapter = zope.publisher.http.HTTPCharsets(self)
+
+        self._charsets = adapter.getPreferredCharsets() or ['utf-8']
+
+        return self._charsets
+
+    @property
+    def charset(self):
+        return self.charsets[0]
+
+    @property
+    def environ(self):
+        return self._environ
+
+    @property
+    def webob(self):
+        try:
+            return self._webob
+        except AttributeError:
+            w = self._webob = webob.Request(self._environ)
+            if w.charset is None:
+                w.charset = self.charset
+            return w
+
+    @property
+    def headers(self):
+        return self.webob.headers
+
+    def __getattr__(self, name):
+        if name == '_webob':
+            raise AttributeError(name)
+        return getattr(self.webob, name)
+
+    def __setattr__(self, name, value):
+        try:
+            object.__setattr__(self, name, value)
+        except AttributeError:
+            if name == 'charsets':
+                if value is None:
+                    return
+                raise
+            setattr(self.webob, name, value)
+
+    def __delattr__(self, name):
+        try:
+            object.__delattr__(self, name)
+        except AttributeError:
+            delattr(self.webob, name)
+
+    @property
+    def body_file(self):
+        return self.bodyStream
+
+    @property
+    def body(self):
+        stream = self.bodyStream
+        stream.seek(0)
+        return stream.read()
+
+    @property
+    def params(self):
+        return BozoMultiDict(self.form)
+
+    @property
+    def params(self):
+        return BozoMultiDict(self.form)
+
+    GET = params
+
+    @property
+    def POST(self):
+        if self.method in ('POST', 'PUT'):
+            content_type = self._environ.get('CONTENT_TYPE')
+            if content_type and (
+                content_type.startswith('application/x-www-form-urlencoded')
+                or
+                content_type.startswith('multipart/')
+                ):
+                return self.params
+        return webob.multidict.NoVars('Not a POST request')


Property changes on: bozo.reuse/branches/dev/src/bozo/reuse/request.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Modified: bozo.reuse/branches/dev/src/bozo/reuse/tests.py
===================================================================
--- bozo.reuse/branches/dev/src/bozo/reuse/tests.py	2009-06-07 09:44:12 UTC (rev 100681)
+++ bozo.reuse/branches/dev/src/bozo/reuse/tests.py	2009-06-07 12:34:46 UTC (rev 100682)
@@ -11,12 +11,62 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
+from cStringIO import StringIO
+from zope.testing import doctest, renormalizing
+import bozo.reuse
 import re
 import unittest
-from zope.testing import doctest, renormalizing
+import zope.publisher.tests.test_browserrequest
 
+class BozoBrowserTests(zope.publisher.tests.test_browserrequest.BrowserTests):
+
+    def _createRequest(self, extra_env={}, body=""):
+        env = self._testEnv.copy()
+        env.update(extra_env)
+        if len(body):
+            env['CONTENT_LENGTH'] = str(len(body))
+
+        publication = zope.publisher.tests.test_browserrequest.Publication(
+            self.app)
+        request = bozo.reuse.Request(StringIO(body), env)
+        request.setPublication(publication)
+        return request
+
+    def testHeaders(self):
+        headers = {
+            'HTTP_TEST_HEADER': 'test',
+            'Another-Test': 'another',
+        }
+        req = self._createRequest(extra_env=headers)
+        self.assertEquals(req.headers[u'TEST-HEADER'], u'test')
+        self.assertEquals(req.headers[u'test-header'], u'test')
+        self.assertEquals(req.getHeader('TEST_HEADER', literal=True), u'test')
+        self.assertEquals(req.getHeader('TEST-HEADER', literal=True), None)
+        self.assertEquals(req.getHeader('test_header', literal=True), None)
+        self.assertEquals(req.getHeader('Another-Test', literal=True),
+                          'another')
+
+    def testIssue559(self):
+        extra = {'QUERY_STRING': 'HTTP_REFERER=peter',
+                 'HTTP_REFERER':'http://localhost/',
+                 'PATH_INFO': '/folder/item3/'}
+        request = self._createRequest(extra)
+        zope.publisher.tests.test_browserrequest.publish(request)
+        self.assertEqual(request.headers.get('Referer'), 'http://localhost/')
+        self.assertEqual(request.form, {u'HTTP_REFERER': u'peter'})
+
+class BozoAPITests(zope.publisher.tests.test_browserrequest.BrowserTests):
+
+    def _Test__new(self, environ=None, **kw):
+        if environ is None:
+            environ = kw
+        return bozo.reuse.Request(StringIO(''), environ)
+
+
 def test_suite():
     return unittest.TestSuite((
+        unittest.makeSuite(BozoBrowserTests),
+        unittest.makeSuite(BozoAPITests),
         doctest.DocFileSuite(
             'doc/request.test',
             checker=renormalizing.RENormalizing([



More information about the Checkins mailing list