[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/tests - testHTTPServer2.py:1.1.2.1

Shane Hathaway shane@digicool.com
Mon, 26 Nov 2001 09:57:24 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server/tests
In directory cvs.zope.org:/tmp/cvs-serv4654/tests

Added Files:
      Tag: Zope-3x-branch
	testHTTPServer2.py 
Log Message:
- Better in sync with HTTP spec.

- Pipelineable without making unnecessary buffers

- Added ZPL

- Moved task management out of dual_mode_channel



=== Added File Zope3/lib/python/Zope/Server/tests/testHTTPServer2.py ===
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
# FOR A PARTICULAR PURPOSE.

import unittest
from asyncore import socket_map, poll

from threading import Thread
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
from Zope.Server.HTTPServer2 import http_server
from Zope.Server.dual_mode_channel import pull_trigger

from httplib import HTTPConnection

from time import sleep

import sys
sys.setcheckinterval(120)

tasks = ThreadedTaskDispatcher()


class Tests(unittest.TestCase):

    def setUp(self):
        tasks.setThreadCount(4)
        # Select any port on localhost
        self.server = http_server('127.0.0.1', 0, tasks=tasks)
        self.port = self.server.socket.getsockname()[1]
        self.run_loop = 1
        self.counter = 0
        self.thread = Thread(target=self.loop)
        self.thread.start()
        sleep(0.1)  # Let the thread start.

    def tearDown(self):
        self.run_loop = 0
        tasks.shutdown()
        self.server.close()
        self.thread.join()

    def loop(self):
        while self.run_loop:
            self.counter = self.counter + 1
            #print 'loop', self.counter
            poll(2.0, socket_map)

    def testOkResponse(self, h=None, add_headers=None):
        if h is None:
            h = HTTPConnection('127.0.0.1', self.port)
        h.putrequest('GET', '/')
        h.putheader('Accept', 'text/plain')
        if add_headers:
            for k, v in add_headers.items():
                h.putheader(k, v)
        h.endheaders()
        response = h.getresponse()
        self.failUnlessEqual(int(response.status), 200)
        length = int(response.getheader('Content-Length'))
        data = response.read()
        self.failUnlessEqual(length, len(data))

    def testMultipleRequests(self):
        h = HTTPConnection('127.0.0.1', self.port)
        for n in range(3):
            self.testOkResponse(h)
        self.testOkResponse(h, {'Connection': 'close'})

    def testManyClients(self):
        conns = []
        for n in range(120):  # Linux kernel (2.4.8) doesn't like > 128 ?
            #print 'open', n, clock()
            h = HTTPConnection('127.0.0.1', self.port)
            #h.debuglevel = 1
            h.putrequest('GET', '/')
            h.putheader('Accept', 'text/plain')
            h.endheaders()
            conns.append(h)
            # If you uncomment the next line, you can raise the
            # number of connections much higher without running
            # into delays.
            #sleep(0.01)
        responses = []
        for h in conns:
            response = h.getresponse()
            self.failUnlessEqual(response.status, 200)
            responses.append(response)
        for response in responses:
            response.read()
            


def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(Tests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )