[Zope-Checkins] SVN: Zope/trunk/src/ZPublisher/ Merge r107029 from the tseaver-fix_wsgi branch.

Tres Seaver tseaver at palladion.com
Tue Jun 1 17:22:15 EDT 2010


Log message for revision 112893:
  Merge r107029 from the tseaver-fix_wsgi branch.
  
  - Start test coverage for WSGIPublisher.
  

Changed:
  U   Zope/trunk/src/ZPublisher/WSGIPublisher.py
  A   Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py

-=-
Modified: Zope/trunk/src/ZPublisher/WSGIPublisher.py
===================================================================
--- Zope/trunk/src/ZPublisher/WSGIPublisher.py	2010-06-01 21:08:52 UTC (rev 112892)
+++ Zope/trunk/src/ZPublisher/WSGIPublisher.py	2010-06-01 21:22:15 UTC (rev 112893)
@@ -13,7 +13,6 @@
 """ Python Object Publisher -- Publish Python objects on web servers
 """
 from cStringIO import StringIO
-import re
 import sys
 import time
 
@@ -25,6 +24,12 @@
 from ZPublisher.maybe_lock import allocate_lock
 from ZPublisher.mapply import mapply
 
+_NOW = None     # overwrite for testing
+def _now():
+    if _NOW is not None:
+        return _NOW
+    return time.time()
+
 class WSGIResponse(HTTPResponse):
     """A response object for WSGI
 
@@ -34,57 +39,53 @@
     Most significantly, streaming is not (yet) supported.
     """
     _streaming = 0
+    _http_version = None
+    _server_version = None
+    _http_connection = None
     
-    def __str__(self,
-                html_search=re.compile('<html>',re.I).search,
-                ):
+    def __str__(self):
+
         if self._wrote:
             if self._chunking:
                 return '0\r\n\r\n'
             else:
                 return ''
 
-        headers=self.headers
-        body=self.body
+        headers = self.headers
+        body = self.body
 
         # set 204 (no content) status if 200 and response is empty
         # and not streaming
-        if not headers.has_key('content-type') and \
-                not headers.has_key('content-length') and \
-                not self._streaming and \
-                self.status == 200:
+        if ('content-type' not in headers and 
+            'content-length' not in headers and 
+            not self._streaming and self.status == 200):
             self.setStatus('nocontent')
 
         # add content length if not streaming
-        if not headers.has_key('content-length') and \
-                not self._streaming:
-            self.setHeader('content-length',len(body))
+        content_length = headers.get('content-length')
 
+        if content_length is None and not self._streaming:
+            self.setHeader('content-length', len(body))
 
-        content_length= headers.get('content-length', None)
-        if content_length>0 :
-            self.setHeader('content-length', content_length)
+        chunks = []
+        append = chunks.append
 
-        headersl=[]
-        append=headersl.append
-
-        status=headers.get('status', '200 OK')
-
         # status header must come first.
-        append("HTTP/%s %s" % (self._http_version or '1.0' , status))
-        if headers.has_key('status'):
-            del headers['status']
+        version = self._http_version or '1.0'
+        append("HTTP/%s %d %s" % (version, self.status, self.errmsg))
 
         # add zserver headers
-        append('Server: %s' % self._server_version)
-        append('Date: %s' % build_http_date(time.time()))
+        if self._server_version is not None:
+            append('Server: %s' % self._server_version)
 
-        if self._http_version=='1.0':
-            if self._http_connection=='keep-alive' and \
-                    self.headers.has_key('content-length'):
-                self.setHeader('Connection','Keep-Alive')
+        append('Date: %s' % build_http_date(_now()))
+
+        if self._http_version == '1.0':
+            if (self._http_connection == 'keep-alive' and 
+                'content-length' in self.headers):
+                self.setHeader('Connection', 'Keep-Alive')
             else:
-                self.setHeader('Connection','close')
+                self.setHeader('Connection', 'close')
 
         # Close the connection if we have been asked to.
         # Use chunking if streaming output.
@@ -109,11 +110,18 @@
                     start=l+1
                     l=key.find('-',start)
             append("%s: %s" % (key, val))
+
         if self.cookies:
-            headersl=headersl+self._cookie_list()
-        headersl[len(headersl):]=[self.accumulated_headers, body]
-        return "\r\n".join(headersl)
+            chunks.extend(self._cookie_list())
 
+        for key, value in self.accumulated_headers:
+            append("%s: %s" % (key, value))
+
+        append('') # RFC 2616 mandates empty line between headers and payload
+        append(body)
+
+        return "\r\n".join(chunks)
+
     
 class Retry(Exception):
     """Raise this to retry a request

Added: Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py
===================================================================
--- Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py	                        (rev 0)
+++ Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py	2010-06-01 21:22:15 UTC (rev 112893)
@@ -0,0 +1,135 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import unittest
+
+class WSGIResponseTests(unittest.TestCase):
+
+    _old_NOW = None
+
+    def tearDown(self):
+        if self._old_NOW is not None:
+            self._setNOW(self._old_NOW)
+
+    def _getTargetClass(self):
+        from ZPublisher.WSGIPublisher import WSGIResponse
+        return WSGIResponse
+
+    def _makeOne(self, *args, **kw):
+        return self._getTargetClass()(*args, **kw)
+
+    def _setNOW(self, value):
+        from ZPublisher import WSGIPublisher
+        WSGIPublisher._NOW, self._old_NOW = value, WSGIPublisher._NOW
+
+    def test___str__already_wrote_not_chunking(self):
+        response = self._makeOne()
+        response._wrote = True
+        response._chunking = False
+        self.assertEqual(str(response), '')
+
+    def test___str__already_wrote_w_chunking(self):
+        response = self._makeOne()
+        response._wrote = True
+        response._chunking = True
+        self.assertEqual(str(response), '0\r\n\r\n')
+
+    def test___str__sets_204_on_empty_not_streaming(self):
+        response = self._makeOne()
+        str(response) # not checking value
+        self.assertEqual(response.status, 204)
+
+    def test___str__sets_204_on_empty_not_streaming_ignores_non_200(self):
+        response = self._makeOne()
+        response.setStatus(302)
+        str(response) # not checking value
+        self.assertEqual(response.status, 302)
+
+    def test___str___sets_content_length_if_missing(self):
+        response = self._makeOne()
+        response.setBody('TESTING')
+        str(response) # not checking value
+        self.assertEqual(response.getHeader('Content-Length'),
+                         str(len('TESTING')))
+
+    def test___str___skips_setting_content_length_if_missing_w_streaming(self):
+        response = self._makeOne()
+        response._streaming = True
+        response.body = 'TESTING'
+        str(response) # not checking value
+        self.failIf(response.getHeader('Content-Length'))
+
+    def test___str___w_default_http_version(self):
+        response = self._makeOne()
+        response.setBody('TESTING')
+        result = str(response).splitlines()
+        self.assertEqual(result[0], 'HTTP/1.0 200 OK')
+
+    def test___str___w_explicit_http_version(self):
+        response = self._makeOne()
+        response.setBody('TESTING')
+        response._http_version = '1.1'
+        result = str(response).splitlines()
+        self.assertEqual(result[0], 'HTTP/1.1 200 OK')
+
+    def test___str___skips_Server_header_wo_server_version_set(self):
+        response = self._makeOne()
+        response.setBody('TESTING')
+        result = str(response).splitlines()
+        sv = [x for x in result if x.lower().startswith('server-version')]
+        self.failIf(sv)
+
+    def test___str___includes_Server_header_w_server_version_set(self):
+        response = self._makeOne()
+        response._server_version = 'TESTME'
+        response.setBody('TESTING')
+        result = str(response).splitlines()
+        self.assertEqual(result[1], 'Server: TESTME')
+
+    def test___str___includes_Date_header(self):
+        import time
+        WHEN = time.localtime()
+        self._setNOW(time.mktime(WHEN))
+        response = self._makeOne()
+        response.setBody('TESTING')
+        result = str(response).splitlines()
+        self.assertEqual(result[1], 'Date: %s' %
+                time.strftime('%a, %d %b %Y %H:%M:%S GMT',
+                              time.gmtime(time.mktime(WHEN))))
+
+    def test___str___HTTP_1_0_keep_alive_w_content_length(self):
+        response = self._makeOne()
+        response._http_version = '1.0'
+        response._http_connection = 'keep-alive'
+        response.setBody('TESTING')
+        str(response) # not checking value
+        self.assertEqual(response.getHeader('Connection'), 'Keep-Alive')
+
+    def test___str___HTTP_1_0_keep_alive_wo_content_length_streaming(self):
+        response = self._makeOne()
+        response._http_version = '1.0'
+        response._http_connection = 'keep-alive'
+        response._streaming = True
+        str(response) # not checking value
+        self.assertEqual(response.getHeader('Connection'), 'close')
+
+    def test___str___HTTP_1_0_not_keep_alive_w_content_length(self):
+        response = self._makeOne()
+        response._http_version = '1.0'
+        response.setBody('TESTING')
+        str(response) # not checking value
+        self.assertEqual(response.getHeader('Connection'), 'close')
+
+def test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(WSGIResponseTests))
+    return suite



More information about the Zope-Checkins mailing list