[Checkins] SVN: bozo.reuse/branches/dev/src/bozo/reuse/ Implemented request.
Jim Fulton
jim at zope.com
Sun Jun 7 08:34:46 EDT 2009
Log message for revision 100682:
Implemented request.
Changed:
U bozo.reuse/branches/dev/src/bozo/reuse/__init__.py
D bozo.reuse/branches/dev/src/bozo/reuse/application.py
A bozo.reuse/branches/dev/src/bozo/reuse/interfaces.py
A bozo.reuse/branches/dev/src/bozo/reuse/request.py
U bozo.reuse/branches/dev/src/bozo/reuse/tests.py
-=-
Modified: bozo.reuse/branches/dev/src/bozo/reuse/__init__.py
===================================================================
--- bozo.reuse/branches/dev/src/bozo/reuse/__init__.py 2009-06-07 09:44:12 UTC (rev 100681)
+++ bozo.reuse/branches/dev/src/bozo/reuse/__init__.py 2009-06-07 12:34:46 UTC (rev 100682)
@@ -1 +1,16 @@
+##############################################################################
#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from bozo.reuse import interfaces # Gaaah, circular imports
+from bozo.reuse.request import Request
Deleted: bozo.reuse/branches/dev/src/bozo/reuse/application.py
===================================================================
--- bozo.reuse/branches/dev/src/bozo/reuse/application.py 2009-06-07 09:44:12 UTC (rev 100681)
+++ bozo.reuse/branches/dev/src/bozo/reuse/application.py 2009-06-07 12:34:46 UTC (rev 100682)
@@ -1,238 +0,0 @@
-##############################################################################
-#
-# Copyright Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-
-import bobo
-import zope.event
-import zope.i18n.interfaces
-import zope.publisher.browser
-import zope.publisher.http
-import webob
-import webob.multidict
-
-class BozoMultiDict:
-
- def __init__(self, data):
- self.data = data
-
- def __getitem__(self, k):
- v = self.data[k]
- if isinstance(v, list):
- return v[-1]
- return v
-
- def getall(self, name):
- value = self.data.get(name)
- if value is None:
- return []
- if isinstance(value, list):
- return value
- return [value]
-
- def getone(self, name):
- value = self.data.get(name)
- if value is None:
- raise KeyError(name)
- if isinstance(value, list):
- if len(value) != 1:
- raise KeyError(name)
- value = value[0]
- return value
-
- def mixed(self):
- return self.data
-
- def dict_of_lists(self):
- return dict((k, self.getall(k)) for k in self.data)
-
- def __contains__(self, k):
- return k in self.data
-
- def copy(self):
- return self.__class__(self.data.copy())
-
- def __len__(self):
- return len(self.data)
-
- def iteritems(self):
- data = self.data
- for k in sorted(data):
- v = data[k]
- if isinstance(v, list):
- for x in v:
- yield k, x
- else:
- yield k, v
-
- def items(self):
- return list(self.iteritems())
-
- def iterkeys(self):
- return (item[0] for item in self.iteritems())
-
- __iter__ = iterkeys
-
- def keys(self):
- return list(self.iterkeys())
-
- def itervalues(self):
- return (item[1] for item in self.iteritems())
-
- def values(self):
- return list(self.itervalues())
-
- def __repr__(self):
- return "BozoMultiDict(%r)" % self.items()
-
-class Request(zope.publisher.browser.BrowserRequest):
-
- __slots__ = ('_webob', '_form', '_charsets')
-
- def __init__(self, *args):
- zope.publisher.browser.BrowserRequest.__init__(self, *args)
- self.processInputs()
-
- @classmethod
- def blank(class_, *args, **kw):
- environ = webob.Request.blank(*args, **kw).environ
- return class_(environ['wsgi.input'], environ)
-
- @property
- def charsets(self):
- try:
- return self._charsets
- except AttributeError:
- pass
-
- adapter = zope.i18n.interfaces.IUserPreferredCharsets(self, None)
- if adapter is None:
- adapter = zope.publisher.http.HTTPCharsets(self)
-
- self._charsets = adapter.getPreferredCharsets() or ['utf-8']
-
- return self._charsets
-
- @property
- def charset(self):
- return self.charsets[0]
-
- @property
- def environ(self):
- return self._environ
-
- @property
- def webob(self):
- try:
- return self._webob
- except AttributeError:
- w = self._webob = webob.Request(self._environ)
- if w.charset is None:
- w.charset = self.charset
- return w
-
- @property
- def headers(self):
- return self.webob.headers
-
- def __getattr__(self, name):
- if name == '_webob':
- raise AttributeError(name)
- return getattr(self.webob, name)
-
- def __setattr__(self, name, value):
- try:
- object.__setattr__(self, name, value)
- except AttributeError:
- if name == 'charsets':
- if value is None:
- return
- raise
- setattr(self.webob, name, value)
-
- def __delattr__(self, name):
- try:
- object.__delattr__(self, name)
- except AttributeError:
- delattr(self.webob, name)
-
- @property
- def body_file(self):
- return self.bodyStream
-
- @property
- def body(self):
- stream = self.bodyStream
- stream.seek(0)
- return stream.read()
-
- @property
- def params(self):
- return BozoMultiDict(self.form)
-
- @property
- def params(self):
- return BozoMultiDict(self.form)
-
- GET = params
-
- @property
- def POST(self):
- if self.method in ('POST', 'PUT'):
- content_type = self._environ.get('CONTENT_TYPE')
- if content_type and (
- content_type.startswith('application/x-www-form-urlencoded')
- or
- content_type.startswith('multipart/')
- ):
- return self.params
- return webob.multidict.NoVars('Not a POST request')
-
-
-class Application(bobo.Application):
-
- def __call__(self, environ, start_response):
- request = Request(environ['wsgi.input'], environ)
- zope.event.notify(BeginRequest(request))
- try:
- return self.bobo_response(
- request, request.path_info, request.method
- )(environ, start_response)
- finally:
- zope.event.notify(EndRequest(request))
-
- def build_response(self, request, method, data):
- content_type = data.content_type
- response = request.response
- if content_type:
- response.setHeader('Content-Type', content_type)
- for name, value in data.headers:
- response.setHeader(name, value)
- result = data.body
- if result is not response:
- response.setResult(result)
-
- def wsgiresponse(environ, start_response):
- start_response(response.getStatusString(), response.getHeaders())
- return response.consumeBodyIter()
-
- return wsgiresponse
-
-class RequestEvent:
- def __init__(self, request):
- self.request = request
-
-class BeginRequest(RequestEvent):
- "A request has begun"
-
-class EndRequest(RequestEvent):
- "A request has finished processing"
Added: bozo.reuse/branches/dev/src/bozo/reuse/interfaces.py
===================================================================
--- bozo.reuse/branches/dev/src/bozo/reuse/interfaces.py (rev 0)
+++ bozo.reuse/branches/dev/src/bozo/reuse/interfaces.py 2009-06-07 12:34:46 UTC (rev 100682)
@@ -0,0 +1,33 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import bobo
+import zope.interface
+import zope.publisher.interfaces.browser
+
+class IResource(zope.interface.Interface):
+
+ def bobo_response(request, path, method):
+ """Return a response for the given request, path and method
+
+ The response is a callable with the same signature as a WSGI
+ application.
+ """
+
+# XXX bobo should expose somethinbg we can use:
+zope.interface.classImplements(bobo._Handler, IResource)
+
+class IRequest(zope.publisher.interfaces.browser.IBrowserRequest):
+ """Browser requests that also provide the webob.Request API
+ """
Property changes on: bozo.reuse/branches/dev/src/bozo/reuse/interfaces.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: bozo.reuse/branches/dev/src/bozo/reuse/request.py
===================================================================
--- bozo.reuse/branches/dev/src/bozo/reuse/request.py (rev 0)
+++ bozo.reuse/branches/dev/src/bozo/reuse/request.py 2009-06-07 12:34:46 UTC (rev 100682)
@@ -0,0 +1,244 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import interfaces
+import webob
+import webob.multidict
+import zope.i18n.interfaces
+import zope.interface
+import zope.publisher.browser
+import zope.publisher.http
+
+class BozoMultiDict:
+
+ def __init__(self, data):
+ self.data = data
+
+ def __getitem__(self, k):
+ v = self.data[k]
+ if isinstance(v, list):
+ return v[-1]
+ return v
+
+ def getall(self, name):
+ value = self.data.get(name)
+ if value is None:
+ return []
+ if isinstance(value, list):
+ return value
+ return [value]
+
+ def getone(self, name):
+ value = self.data.get(name)
+ if value is None:
+ raise KeyError(name)
+ if isinstance(value, list):
+ if len(value) != 1:
+ raise KeyError(name)
+ value = value[0]
+ return value
+
+ def mixed(self):
+ return self.data
+
+ def dict_of_lists(self):
+ return dict((k, self.getall(k)) for k in self.data)
+
+ def __contains__(self, k):
+ return k in self.data
+
+ def copy(self):
+ return self.__class__(self.data.copy())
+
+ def __len__(self):
+ return len(self.data)
+
+ def iteritems(self):
+ data = self.data
+ for k in sorted(data):
+ v = data[k]
+ if isinstance(v, list):
+ for x in v:
+ yield k, x
+ else:
+ yield k, v
+
+ def items(self):
+ return list(self.iteritems())
+
+ def iterkeys(self):
+ return (item[0] for item in self.iteritems())
+
+ __iter__ = iterkeys
+
+ def keys(self):
+ return list(self.iterkeys())
+
+ def itervalues(self):
+ return (item[1] for item in self.iteritems())
+
+ def values(self):
+ return list(self.itervalues())
+
+ def __repr__(self):
+ return "BozoMultiDict(%r)" % self.items()
+
+class Request(zope.publisher.browser.BrowserRequest):
+
+ zope.interface.implements(interfaces.IRequest)
+
+ __slots__ = (
+ '_webob', '_form', '_charsets',
+
+ # We have to repeat the __provides__ slot here to deal with
+ # conflicting magic between a descriptor inserted by the interface
+ # machinery and slots. A consequence is that any subclass of Request
+ # also has to repeat this slot. Obviously, this is unfortunate. :(
+ '__provides__',
+ )
+
+ def __init__(self, *args):
+ zope.publisher.browser.BrowserRequest.__init__(self, *args)
+ del self._form
+
+ # Form handling is a bit tricky
+ # Goals:
+ # - be backward compat with BrowserRequest and with zope.publisher
+ # - Can call processInputs after the request is created
+ # - Don't require that processInputs be (externally) called at all.
+ # - Work with the BrowserRequest tests, which assume that form processing
+ # doesn't happen until processInputs is called.
+ #
+ # Extra credit:
+ # - Don't do form processing unless someone asks for form data.
+ #
+ # Solution:
+ # - After creation:
+ # processInputs will be called either when called explicitly, or
+ # the first time someone gets the form attr
+
+ @apply
+ def form():
+ def get_form(self):
+ try:
+ return self._form
+ except AttributeError:
+ self._form = {}
+ self.processInputs()
+ return self._form
+
+ def set_form(self, form):
+ assert not form
+ self._form = form
+
+ return property(get_form, set_form)
+
+ def processInputs(self):
+ self._form = {}
+ super(Request, self).processInputs()
+
+ @classmethod
+ def blank(class_, *args, **kw):
+ environ = webob.Request.blank(*args, **kw).environ
+ return class_(environ['wsgi.input'], environ)
+
+ @property
+ def charsets(self):
+ try:
+ return self._charsets
+ except AttributeError:
+ pass
+
+ adapter = zope.i18n.interfaces.IUserPreferredCharsets(self, None)
+ if adapter is None:
+ adapter = zope.publisher.http.HTTPCharsets(self)
+
+ self._charsets = adapter.getPreferredCharsets() or ['utf-8']
+
+ return self._charsets
+
+ @property
+ def charset(self):
+ return self.charsets[0]
+
+ @property
+ def environ(self):
+ return self._environ
+
+ @property
+ def webob(self):
+ try:
+ return self._webob
+ except AttributeError:
+ w = self._webob = webob.Request(self._environ)
+ if w.charset is None:
+ w.charset = self.charset
+ return w
+
+ @property
+ def headers(self):
+ return self.webob.headers
+
+ def __getattr__(self, name):
+ if name == '_webob':
+ raise AttributeError(name)
+ return getattr(self.webob, name)
+
+ def __setattr__(self, name, value):
+ try:
+ object.__setattr__(self, name, value)
+ except AttributeError:
+ if name == 'charsets':
+ if value is None:
+ return
+ raise
+ setattr(self.webob, name, value)
+
+ def __delattr__(self, name):
+ try:
+ object.__delattr__(self, name)
+ except AttributeError:
+ delattr(self.webob, name)
+
+ @property
+ def body_file(self):
+ return self.bodyStream
+
+ @property
+ def body(self):
+ stream = self.bodyStream
+ stream.seek(0)
+ return stream.read()
+
+ @property
+ def params(self):
+ return BozoMultiDict(self.form)
+
+ @property
+ def params(self):
+ return BozoMultiDict(self.form)
+
+ GET = params
+
+ @property
+ def POST(self):
+ if self.method in ('POST', 'PUT'):
+ content_type = self._environ.get('CONTENT_TYPE')
+ if content_type and (
+ content_type.startswith('application/x-www-form-urlencoded')
+ or
+ content_type.startswith('multipart/')
+ ):
+ return self.params
+ return webob.multidict.NoVars('Not a POST request')
Property changes on: bozo.reuse/branches/dev/src/bozo/reuse/request.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Modified: bozo.reuse/branches/dev/src/bozo/reuse/tests.py
===================================================================
--- bozo.reuse/branches/dev/src/bozo/reuse/tests.py 2009-06-07 09:44:12 UTC (rev 100681)
+++ bozo.reuse/branches/dev/src/bozo/reuse/tests.py 2009-06-07 12:34:46 UTC (rev 100682)
@@ -11,12 +11,62 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
+from cStringIO import StringIO
+from zope.testing import doctest, renormalizing
+import bozo.reuse
import re
import unittest
-from zope.testing import doctest, renormalizing
+import zope.publisher.tests.test_browserrequest
+class BozoBrowserTests(zope.publisher.tests.test_browserrequest.BrowserTests):
+
+ def _createRequest(self, extra_env={}, body=""):
+ env = self._testEnv.copy()
+ env.update(extra_env)
+ if len(body):
+ env['CONTENT_LENGTH'] = str(len(body))
+
+ publication = zope.publisher.tests.test_browserrequest.Publication(
+ self.app)
+ request = bozo.reuse.Request(StringIO(body), env)
+ request.setPublication(publication)
+ return request
+
+ def testHeaders(self):
+ headers = {
+ 'HTTP_TEST_HEADER': 'test',
+ 'Another-Test': 'another',
+ }
+ req = self._createRequest(extra_env=headers)
+ self.assertEquals(req.headers[u'TEST-HEADER'], u'test')
+ self.assertEquals(req.headers[u'test-header'], u'test')
+ self.assertEquals(req.getHeader('TEST_HEADER', literal=True), u'test')
+ self.assertEquals(req.getHeader('TEST-HEADER', literal=True), None)
+ self.assertEquals(req.getHeader('test_header', literal=True), None)
+ self.assertEquals(req.getHeader('Another-Test', literal=True),
+ 'another')
+
+ def testIssue559(self):
+ extra = {'QUERY_STRING': 'HTTP_REFERER=peter',
+ 'HTTP_REFERER':'http://localhost/',
+ 'PATH_INFO': '/folder/item3/'}
+ request = self._createRequest(extra)
+ zope.publisher.tests.test_browserrequest.publish(request)
+ self.assertEqual(request.headers.get('Referer'), 'http://localhost/')
+ self.assertEqual(request.form, {u'HTTP_REFERER': u'peter'})
+
+class BozoAPITests(zope.publisher.tests.test_browserrequest.BrowserTests):
+
+ def _Test__new(self, environ=None, **kw):
+ if environ is None:
+ environ = kw
+ return bozo.reuse.Request(StringIO(''), environ)
+
+
def test_suite():
return unittest.TestSuite((
+ unittest.makeSuite(BozoBrowserTests),
+ unittest.makeSuite(BozoAPITests),
doctest.DocFileSuite(
'doc/request.test',
checker=renormalizing.RENormalizing([
More information about the Checkins
mailing list