[Zope3-checkins] CVS: Zope3/src/zope/server/http/tests - __init__.py:1.1.2.1 test_httprequestparser.py:1.1.2.1 test_httpserver.py:1.1.2.1 test_publisherserver.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:33:23 -0500


Update of /cvs-repository/Zope3/src/zope/server/http/tests
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/server/http/tests

Added Files:
      Tag: NameGeddon-branch
	__init__.py test_httprequestparser.py test_httpserver.py 
	test_publisherserver.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zope/server/http/tests/__init__.py ===
#
# This file is necessary to make this directory a package.


=== Added File Zope3/src/zope/server/http/tests/test_httprequestparser.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: test_httprequestparser.py,v 1.1.2.1 2002/12/23 19:33:21 jim Exp $
"""

import unittest
from zope.server.http.httprequestparser import HTTPRequestParser
from zope.server.adjustments import Adjustments


my_adj = Adjustments()

class Tests(unittest.TestCase):

    def setUp(self):
        self.parser = HTTPRequestParser(my_adj)

    def feed(self, data):
        parser = self.parser
        for n in xrange(100): # make sure we never loop forever
            consumed = parser.received(data)
            data = data[consumed:]
            if parser.completed:
                return
        raise ValueError, 'Looping'

    def testSimpleGET(self):
        data = """\
GET /foobar HTTP/8.4
FirstName: mickey
lastname: Mouse
content-length: 7

Hello.
"""
        parser = self.parser
        self.feed(data)
        self.failUnless(parser.completed)
        self.assertEqual(parser.version, '8.4')
        self.failIf(parser.empty)
        self.assertEqual(parser.headers,
                         {'FIRSTNAME': 'mickey',
                          'LASTNAME': 'Mouse',
                          'CONTENT_LENGTH': '7',
                          })
        self.assertEqual(parser.path, '/foobar')
        self.assertEqual(parser.command, 'GET')
        self.assertEqual(parser.query, None)
        self.assertEqual(parser.getBodyStream().getvalue(), 'Hello.\n')

    def testComplexGET(self):
        data = """\
GET /foo/a+%2B%2F%3D%26a%3Aint?d=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6 HTTP/8.4
FirstName: mickey
lastname: Mouse
content-length: 10

Hello mickey.
"""
        parser = self.parser
        self.feed(data)
        self.assertEqual(parser.command, 'GET')
        self.assertEqual(parser.version, '8.4')
        self.failIf(parser.empty)
        self.assertEqual(parser.headers,
                         {'FIRSTNAME': 'mickey',
                          'LASTNAME': 'Mouse',
                          'CONTENT_LENGTH': '10',
                          })
        self.assertEqual(parser.path, '/foo/a++/=&a:int')
        self.assertEqual(parser.query, 'd=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6')
        self.assertEqual(parser.getBodyStream().getvalue(), 'Hello mick')


def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(Tests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )


=== Added File Zope3/src/zope/server/http/tests/test_httpserver.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: test_httpserver.py,v 1.1.2.1 2002/12/23 19:33:21 jim Exp $
"""

import unittest
from asyncore import socket_map, poll
import socket

from threading import Thread
from zope.server.taskthreads import ThreadedTaskDispatcher
from zope.server.http.httpserver import HTTPServer
from zope.server.adjustments import Adjustments
from zope.server.interfaces.interfaces import ITask
from zope.server.tests.asyncerror import AsyncoreErrorHook

from httplib import HTTPConnection
from httplib import HTTPResponse as ClientHTTPResponse

from time import sleep, time

td = ThreadedTaskDispatcher()

LOCALHOST = '127.0.0.1'
SERVER_PORT = 0      # Set these port numbers to 0 to auto-bind, or
CONNECT_TO_PORT = 0  # use specific numbers to inspect using TCPWatch.


my_adj = Adjustments()
# Reduce overflows to make testing easier.
my_adj.outbuf_overflow = 10000
my_adj.inbuf_overflow = 10000


class EchoHTTPServer(HTTPServer):

    def executeRequest(self, task):
        headers = task.request_data.headers
        if 'CONTENT_LENGTH' in headers:
            cl = headers['CONTENT_LENGTH']
            task.response_headers['Content-Length'] = cl
        instream = task.request_data.getBodyStream()
        while 1:
            data = instream.read(8192)
            if not data:
                break
            task.write(data)


class SleepingTask:

    __implements__ = ITask

    def service(self):
        sleep(0.2)

    def cancel(self):
        pass

    def defer(self):
        pass


class Tests(unittest.TestCase, AsyncoreErrorHook):

    def setUp(self):
        td.setThreadCount(4)
        self.orig_map_size = len(socket_map)
        self.hook_asyncore_error()
        self.server = EchoHTTPServer(LOCALHOST, SERVER_PORT,
                                     task_dispatcher=td, adj=my_adj)
        if CONNECT_TO_PORT == 0:
            self.port = self.server.socket.getsockname()[1]
        else:
            self.port = CONNECT_TO_PORT
        self.run_loop = 1
        self.counter = 0
        self.thread = Thread(target=self.loop)
        self.thread.start()
        sleep(0.1)  # Give the thread some time to start.

    def tearDown(self):
        self.run_loop = 0
        self.thread.join()
        td.shutdown()
        self.server.close()
        # Make sure all sockets get closed by asyncore normally.
        timeout = time() + 5
        while 1:
            if len(socket_map) == self.orig_map_size:
                # Clean!
                break
            if time() >= timeout:
                self.fail('Leaked a socket: %s' % `socket_map`)
            poll(0.1)
        self.unhook_asyncore_error()

    def loop(self):
        while self.run_loop:
            self.counter = self.counter + 1
            #print 'loop', self.counter
            poll(0.1)

    def testEchoResponse(self, h=None, add_headers=None, body=''):
        if h is None:
            h = HTTPConnection(LOCALHOST, self.port)
        h.putrequest('GET', '/')
        h.putheader('Accept', 'text/plain')
        if add_headers:
            for k, v in add_headers.items():
                h.putheader(k, v)
        if body:
            h.putheader('Content-Length', str(int(len(body))))
        h.endheaders()
        if body:
            h.send(body)
        response = h.getresponse()
        self.failUnlessEqual(int(response.status), 200)
        length = int(response.getheader('Content-Length', '0'))
        response_body = response.read()
        self.failUnlessEqual(length, len(response_body))
        self.failUnlessEqual(response_body, body)

    def testMultipleRequestsWithoutBody(self):
        # Tests the use of multiple requests in a single connection.
        h = HTTPConnection(LOCALHOST, self.port)
        for n in range(3):
            self.testEchoResponse(h)
        self.testEchoResponse(h, {'Connection': 'close'})

    def testMultipleRequestsWithBody(self):
        # Tests the use of multiple requests in a single connection.
        h = HTTPConnection(LOCALHOST, self.port)
        for n in range(3):
            self.testEchoResponse(h, body='Hello, world!')
        self.testEchoResponse(h, {'Connection': 'close'})

    def testPipelining(self):
        # Tests the use of several requests issued at once.
        s = ("GET / HTTP/1.0\r\n"
             "Connection: %s\r\n"
             "Content-Length: %d\r\n"
             "\r\n"
             "%s")
        to_send = ''
        count = 25
        for n in range(count):
            body = "Response #%d\r\n" % (n + 1)
            if n + 1 < count:
                conn = 'keep-alive'
            else:
                conn = 'close'
            to_send += s % (conn, len(body), body)

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((LOCALHOST, self.port))
        sock.send(to_send)
        for n in range(count):
            expect_body = "Response #%d\r\n" % (n + 1)
            response = ClientHTTPResponse(sock)
            response.begin()
            self.failUnlessEqual(int(response.status), 200)
            length = int(response.getheader('Content-Length', '0'))
            response_body = response.read(length)
            self.failUnlessEqual(length, len(response_body))
            self.failUnlessEqual(response_body, expect_body)

    def testWithoutCRLF(self):
        # Tests the use of just newlines rather than CR/LFs.
        data = "Echo\nthis\r\nplease"
        s = ("GET / HTTP/1.0\n"
             "Connection: close\n"
             "Content-Length: %d\n"
             "\n"
             "%s") % (len(data), data)

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((LOCALHOST, self.port))
        sock.send(s)
        response = ClientHTTPResponse(sock)
        response.begin()
        self.failUnlessEqual(int(response.status), 200)
        length = int(response.getheader('Content-Length', '0'))
        response_body = response.read(length)
        self.failUnlessEqual(length, len(data))
        self.failUnlessEqual(response_body, data)

    def testLargeBody(self):
        # Tests the use of multiple requests in a single connection.
        h = HTTPConnection(LOCALHOST, self.port)
        s = 'This string has 32 characters.\r\n' * 32  # 1024 characters.
        self.testEchoResponse(h, body=(s * 1024))  # 1 MB
        self.testEchoResponse(h, {'Connection': 'close'},
                              body=(s * 100))  # 100 KB

    def testManyClients(self):
        conns = []
        for n in range(50):  # Linux kernel (2.4.8) doesn't like > 128 ?
            #print 'open', n, clock()
            h = HTTPConnection(LOCALHOST, self.port)
            #h.debuglevel = 1
            h.putrequest('GET', '/')
            h.putheader('Accept', 'text/plain')
            h.endheaders()
            conns.append(h)
            # If you uncomment the next line, you can raise the
            # number of connections much higher without running
            # into delays.
            #sleep(0.01)
        responses = []
        for h in conns:
            response = h.getresponse()
            self.failUnlessEqual(response.status, 200)
            responses.append(response)
        for response in responses:
            response.read()

    def testThreading(self):
        # Ensures the correct number of threads keep running.
        for n in range(4):
            td.addTask(SleepingTask())
        # Try to confuse the task manager.
        td.setThreadCount(2)
        td.setThreadCount(1)
        sleep(0.5)
        # There should be 1 still running.
        self.failUnlessEqual(len(td.threads), 1)

    def testChunkingRequestWithoutContent(self):
        h = HTTPConnection(LOCALHOST, self.port)
        h.putrequest('GET', '/')
        h.putheader('Accept', 'text/plain')
        h.putheader('Transfer-Encoding', 'chunked')
        h.endheaders()
        h.send("0\r\n\r\n")
        response = h.getresponse()
        self.failUnlessEqual(int(response.status), 200)
        response_body = response.read()
        self.failUnlessEqual(response_body, '')

    def testChunkingRequestWithContent(self):
        control_line="20;\r\n"  # 20 hex = 32 dec
        s = 'This string has 32 characters.\r\n'
        expect = s * 12

        h = HTTPConnection(LOCALHOST, self.port)
        h.putrequest('GET', '/')
        h.putheader('Accept', 'text/plain')
        h.putheader('Transfer-Encoding', 'chunked')
        h.endheaders()
        for n in range(12):
            h.send(control_line)
            h.send(s)
        h.send("0\r\n\r\n")
        response = h.getresponse()
        self.failUnlessEqual(int(response.status), 200)
        response_body = response.read()
        self.failUnlessEqual(response_body, expect)


def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(Tests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )


=== Added File Zope3/src/zope/server/http/tests/test_publisherserver.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
##############################################################################
"""

$Id: test_publisherserver.py,v 1.1.2.1 2002/12/23 19:33:21 jim Exp $
"""

import unittest
from asyncore import socket_map, poll
import sys
from threading import Thread

from zope.server.taskthreads import ThreadedTaskDispatcher
from zope.server.http.publisherhttpserver import PublisherHTTPServer

from zope.component.tests.placelesssetup import PlacelessSetup
from zope.component.adapter import provideAdapter

from zope.interfaces.i18n import IUserPreferredCharsets

from zope.publisher.http import IHTTPRequest
from zope.publisher.http import HTTPCharsets
from zope.publisher.browser import BrowserRequest
from zope.publisher.base import DefaultPublication
from zope.interfaces.publisher import Redirect, Retry
from Zope.Publisher.HTTP import HTTPRequest

from httplib import HTTPConnection

from time import sleep, time

td = ThreadedTaskDispatcher()

LOCALHOST = '127.0.0.1'

HTTPRequest.STAGGER_RETRIES = 0  # Don't pause.


class Conflict (Exception):
    """
    Pseudo ZODB conflict error.
    """


class PublicationWithConflict(DefaultPublication):

    def handleException(self, object, request, exc_info, retry_allowed=1):
        if exc_info[0] is Conflict and retry_allowed:
            # This simulates a ZODB retry.
            raise Retry(exc_info)
        else:
            DefaultPublication.handleException(self, object, request, exc_info,
                                               retry_allowed)


class tested_object:
    " "
    tries = 0

    def __call__(self, REQUEST):
        return 'URL invoked: %s' % REQUEST.URL

    def redirect_method(self, REQUEST):
        "Generates a redirect using the redirect() method."
        REQUEST.response.redirect("http://somewhere.com/redirect")

    def redirect_exception(self):
        "Generates a redirect using an exception."
        raise Redirect("http://somewhere.com/exception")

    def conflict(self, REQUEST, wait_tries):
        """
        Returns 202 status only after (wait_tries) tries.
        """
        if self.tries >= int(wait_tries):
            raise "Accepted"
        else:
            self.tries += 1
            raise Conflict



class Tests(PlacelessSetup, unittest.TestCase):

    def setUp(self):
        PlacelessSetup.setUp(self)
        provideAdapter(IHTTPRequest, IUserPreferredCharsets, HTTPCharsets)
        obj = tested_object()
        obj.folder = tested_object()
        obj.folder.item = tested_object()

        obj._protected = tested_object()

        pub = PublicationWithConflict(obj)

        def request_factory(input_stream, output_steam, env):
            request = BrowserRequest(input_stream, output_steam, env)
            request.setPublication(pub)
            return request

        td.setThreadCount(4)
        # Bind to any port on localhost.
        self.server = PublisherHTTPServer(request_factory, 'Browser',
                                          LOCALHOST, 0, task_dispatcher=td)
        self.port = self.server.socket.getsockname()[1]
        self.run_loop = 1
        self.thread = Thread(target=self.loop)
        self.thread.start()
        sleep(0.1)  # Give the thread some time to start.

    def tearDown(self):
        self.run_loop = 0
        self.thread.join()
        td.shutdown()
        self.server.close()

    def loop(self):
        while self.run_loop:
            poll(0.1, socket_map)

    def testResponse(self, path='/', status_expected=200,
                     add_headers=None, request_body=''):
        h = HTTPConnection(LOCALHOST, self.port)
        h.putrequest('GET', path)
        h.putheader('Accept', 'text/plain')
        if add_headers:
            for k, v in add_headers.items():
                h.putheader(k, v)
        if request_body:
            h.putheader('Content-Length', str(int(len(request_body))))
        h.endheaders()
        if request_body:
            h.send(request_body)
        response = h.getresponse()
        length = int(response.getheader('Content-Length', '0'))
        if length:
            response_body = response.read(length)
        else:
            response_body = ''

        # Please do not disable the status code check.  It must work.
        self.failUnlessEqual(int(response.status), status_expected)

        self.failUnlessEqual(length, len(response_body))

        if (status_expected == 200):
            if path == '/': path = ''
            expect_response = 'URL invoked: http://%s:%d%s' % (LOCALHOST,
                self.port, path)
            self.failUnlessEqual(response_body, expect_response)

    def testDeeperPath(self):
        self.testResponse(path='/folder/item')

    def testNotFound(self):
        self.testResponse(path='/foo/bar', status_expected=404)

    def testUnauthorized(self):
        self.testResponse(path='/_protected', status_expected=401)

    def testRedirectMethod(self):
        self.testResponse(path='/redirect_method', status_expected=302)

    def testRedirectException(self):
        self.testResponse(path='/redirect_exception', status_expected=302)
        self.testResponse(path='/folder/redirect_exception',
                          status_expected=302)

    def testConflictRetry(self):
        # Expect the "Accepted" response since the retries will succeed.
        self.testResponse(path='/conflict?wait_tries=2', status_expected=202)

    def testFailedConflictRetry(self):
        # Expect a "Conflict" response since there will be too many
        # conflicts.
        self.testResponse(path='/conflict?wait_tries=10', status_expected=409)



def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(Tests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )