[Zope-Checkins] SVN: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/ Coverage for ZPublisher.WSGIPublisher.publish_module.

Tres Seaver tseaver at palladion.com
Mon May 31 15:41:13 EDT 2010


Log message for revision 112876:
  Coverage for ZPublisher.WSGIPublisher.publish_module.

Changed:
  U   Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py
  U   Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py

-=-
Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py
===================================================================
--- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py	2010-05-31 18:26:04 UTC (rev 112875)
+++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py	2010-05-31 19:41:12 UTC (rev 112876)
@@ -15,17 +15,15 @@
 from cStringIO import StringIO
 import time
 
+import transaction
 from zExceptions import Redirect
 from zExceptions import Unauthorized
-from ZServer.medusa.http_date import build_http_date
 from zope.event import notify
-
-from ZPublisher.HTTPResponse import HTTPResponse
-from ZPublisher.HTTPRequest import HTTPRequest
-
-from zope.publisher.interfaces import ISkinnable
 from zope.publisher.skinnable import setDefaultSkin
+from ZServer.medusa.http_date import build_http_date
 
+from ZPublisher.HTTPRequest import HTTPRequest
+from ZPublisher.HTTPResponse import HTTPResponse
 from ZPublisher.mapply import mapply
 from ZPublisher.pubevents import PubBeforeStreaming
 from ZPublisher.Publish import call_object
@@ -148,11 +146,8 @@
         raise NotImplementedError
 
 def publish(request, module_name,
-            _get_module_info=None,  # only for testing
+            _get_module_info=get_module_info,  # only for testing
            ):
-    if _get_module_info is None:
-        _get_module_info = get_module_info
-
     (bobo_before,
      bobo_after,
      object,
@@ -204,23 +199,26 @@
 
     return response
 
-def publish_module(environ, start_response):
+def publish_module(environ, start_response,
+                   _publish=publish,                # only for testing
+                   _response_factory=WSGIResponse,  # only for testing
+                   _request_factory=HTTPRequest,    # only for testing
+                  ):
     status = 200
     stdout = StringIO()
     stderr = StringIO()
-    response = WSGIResponse(stdout=stdout, stderr=stderr)
+    response = _response_factory(stdout=stdout, stderr=stderr)
     response._http_version = environ['SERVER_PROTOCOL'].split('/')[1]
     response._http_connection = environ.get('CONNECTION_TYPE', 'close')
     response._server_version = environ.get('SERVER_SOFTWARE')
 
-    request = HTTPRequest(environ['wsgi.input'], environ, response)
-    if ISkinnable.providedBy(request):
-        setDefaultSkin(request)
+    request = _request_factory(environ['wsgi.input'], environ, response)
+    setDefaultSkin(request)
 
     try:
-        response = publish(request, 'Zope2')
+        response = _publish(request, 'Zope2')
     except Unauthorized, v:
-        pass
+        response._unauthorized()
     except Redirect, v:
         response.redirect(v)
 
@@ -234,10 +232,9 @@
         # If somebody used response.write, that data will be in the
         # stdout StringIO, so we put that before the body.
         # XXX This still needs verification that it really works.
-        result=(stdout.getvalue(), response.body)
+        result = (stdout.getvalue(), response.body)
 
     if 'repoze.tm.active' in environ:
-        import transaction
         txn = transaction.get()
         txn.addAfterCommitHook(lambda ok: request.close())
     else:

Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py
===================================================================
--- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py	2010-05-31 18:26:04 UTC (rev 112875)
+++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py	2010-05-31 19:41:12 UTC (rev 112876)
@@ -219,6 +219,7 @@
         self._callFUT(request, 'okmodule', _gmi)
         self.assertEqual(response.realm, None)
 
+
 class Test_publish_module(unittest.TestCase):
     
     def setUp(self):
@@ -230,8 +231,21 @@
         from zope.testing.cleanup import cleanUp
         cleanUp()
 
-    def _callFUT(self, environ, start_response):
+    def _callFUT(self, environ, start_response,
+                _publish=None, _response_factory=None, _request_factory=None):
         from ZPublisher.WSGIPublisher import publish_module
+        if _publish is not None:
+            if _response_factory is not None:
+                if _request_factory is not None:
+                    return publish_module(environ, start_response, _publish,
+                                          _response_factory, _request_factory)
+                return publish_module(environ, start_response, _publish,
+                                      _response_factory)
+            else:
+                if _request_factory is not None:
+                    return publish_module(environ, start_response, _publish,
+                                        _request_factory=_request_factory)
+                return publish_module(environ, start_response, _publish)
         return publish_module(environ, start_response)
 
     def _registerView(self, factory, name, provides=None):
@@ -264,7 +278,7 @@
         environ.update(kw)
         return environ
 
-    def test_publish_module_uses_setDefaultSkin(self):
+    def test_calls_setDefaultSkin(self):
         from zope.traversing.interfaces import ITraversable
         from zope.traversing.namespace import view
 
@@ -286,8 +300,113 @@
         environ = self._makeEnviron(PATH_INFO='/@@testing')
         self.assertEqual(self._callFUT(environ, noopStartResponse),
                          ('', 'foobar'))
-    
 
+    def test_publish_can_return_new_response(self):
+        from ZPublisher.HTTPRequest import HTTPRequest
+        _response = DummyResponse()
+        _response.body = 'BODY'
+        _after1 = DummyCallable()
+        _after2 = DummyCallable()
+        _response.after_list = (_after1, _after2)
+        environ = self._makeEnviron()
+        start_response = DummyCallable()
+        _publish = DummyCallable()
+        _publish._result = _response
+        app_iter = self._callFUT(environ, start_response, _publish)
+        self.assertEqual(app_iter, ('', 'BODY'))
+        (status, headers), kw = start_response._called_with
+        self.assertEqual(status, '204 No Content')
+        self.assertEqual(headers, [('Content-Length', '0')])
+        self.assertEqual(kw, {})
+        (request, module), kw = _publish._called_with
+        self.failUnless(isinstance(request, HTTPRequest))
+        self.assertEqual(module, 'Zope2')
+        self.assertEqual(kw, {})
+        self.failUnless(_response._finalized)
+        self.assertEqual(_after1._called_with, ((), {}))
+        self.assertEqual(_after2._called_with, ((), {}))
+
+    def test_swallows_Unauthorized(self):
+        from zExceptions import Unauthorized
+        environ = self._makeEnviron()
+        start_response = DummyCallable()
+        _publish = DummyCallable()
+        _publish._raise = Unauthorized('TESTING')
+        app_iter = self._callFUT(environ, start_response, _publish)
+        self.assertEqual(app_iter, ('', ''))
+        (status, headers), kw = start_response._called_with
+        self.assertEqual(status, '401 Unauthorized')
+        self.failUnless(('Content-Length', '0') in headers)
+        self.assertEqual(kw, {})
+
+    def test_swallows_Redirect(self):
+        from zExceptions import Redirect
+        environ = self._makeEnviron()
+        start_response = DummyCallable()
+        _publish = DummyCallable()
+        _publish._raise = Redirect('/redirect_to')
+        app_iter = self._callFUT(environ, start_response, _publish)
+        self.assertEqual(app_iter, ('', ''))
+        (status, headers), kw = start_response._called_with
+        self.assertEqual(status, '302 Moved Temporarily')
+        self.failUnless(('Location', '/redirect_to') in headers)
+        self.failUnless(('Content-Length', '0') in headers)
+        self.assertEqual(kw, {})
+
+    def test_response_body_is_file(self):
+        class DummyFile(file):
+            def __init__(self):
+                pass
+            def read(self, *args, **kw):
+                raise NotImplementedError()
+        _response = DummyResponse()
+        _response._status = '200 OK'
+        _response._headers = [('Content-Length', '4')]
+        body = _response.body = DummyFile()
+        environ = self._makeEnviron()
+        start_response = DummyCallable()
+        _publish = DummyCallable()
+        _publish._result = _response
+        app_iter = self._callFUT(environ, start_response, _publish)
+        self.failUnless(app_iter is body)
+
+    def test_request_closed_when_tm_middleware_not_active(self):
+        environ = self._makeEnviron()
+        start_response = DummyCallable()
+        _request = DummyRequest()
+        _request._closed = False
+        def _close():
+            _request._closed = True
+        _request.close = _close
+        def _request_factory(stdin, environ, response):
+            return _request
+        _publish = DummyCallable()
+        _publish._result = DummyResponse()
+        app_iter = self._callFUT(environ, start_response, _publish,
+                                 _request_factory=_request_factory)
+        self.failUnless(_request._closed)
+
+    def test_request_not_closed_when_tm_middleware_active(self):
+        import transaction
+        environ = self._makeEnviron()
+        environ['repoze.tm.active'] = 1
+        start_response = DummyCallable()
+        _request = DummyRequest()
+        _request._closed = False
+        def _close():
+            _request._closed = True
+        _request.close = _close
+        def _request_factory(stdin, environ, response):
+            return _request
+        _publish = DummyCallable()
+        _publish._result = DummyResponse()
+        app_iter = self._callFUT(environ, start_response, _publish,
+                                 _request_factory=_request_factory)
+        self.failIf(_request._closed)
+        txn = transaction.get()
+        self.failUnless(list(txn.getAfterCommitHooks()))
+
+
 class DummyRequest(dict):
     _processedInputs = False
     _traversed = None
@@ -306,10 +425,19 @@
     after_list = ()
     realm = None
     _body = None
+    _finalized = False
+    _status = '204 No Content'
+    _headers = [('Content-Length', '0')]
 
+    def finalize(self):
+        self._finalized = True
+        return self._status, self._headers
+
     def setBody(self, body):
         self._body = body
 
+    body = property(lambda self: self._body, setBody)
+
 class DummyCallable(object):
     _called_with = _raise = _result = None
 



More information about the Zope-Checkins mailing list