[Zope-Checkins] CVS: Zope/lib/python/ZServer/medusa/thread - __init__.py:1.1.2.1 pi_module.py:1.1.2.1 select_trigger.py:1.1.2.1 test_module.py:1.1.2.1 thread_channel.py:1.1.2.1 thread_handler.py:1.1.2.1

Chris McDonough chrism@zope.com
Tue, 17 Sep 2002 01:16:11 -0400


Update of /cvs-repository/Zope/lib/python/ZServer/medusa/thread
In directory cvs.zope.org:/tmp/cvs-serv12650/lib/python/ZServer/medusa/thread

Added Files:
      Tag: chrism-install-branch
	__init__.py pi_module.py select_trigger.py test_module.py 
	thread_channel.py thread_handler.py 
Log Message:
Moved ZServer into lib/python.


=== Added File Zope/lib/python/ZServer/medusa/thread/__init__.py ===
# make thread to appear as a package



=== Added File Zope/lib/python/ZServer/medusa/thread/pi_module.py ===
# -*- Mode: Python; tab-width: 4 -*-

# [reworking of the version in Python-1.5.1/Demo/scripts/pi.py]

# Print digits of pi forever.
#
# The algorithm, using Python's 'long' integers ("bignums"), works
# with continued fractions, and was conceived by Lambert Meertens.
#
# See also the ABC Programmer's Handbook, by Geurts, Meertens & Pemberton,
# published by Prentice-Hall (UK) Ltd., 1990.

import string

StopException = "Stop!"

def go (file):
    try:
        k, a, b, a1, b1 = 2L, 4L, 1L, 12L, 4L
        while 1:
                # Next approximation
            p, q, k = k*k, 2L*k+1L, k+1L
            a, b, a1, b1 = a1, b1, p*a+q*a1, p*b+q*b1
            # Print common digits
            d, d1 = a/b, a1/b1
            while d == d1:
                if file.write (str(int(d))):
                    raise StopException
                a, a1 = 10L*(a%b), 10L*(a1%b1)
                d, d1 = a/b, a1/b1
    except StopException:
        return
        
class line_writer:

    "partition the endless line into 80-character ones"
    
    def __init__ (self, file, digit_limit=10000):
        self.file = file
        self.buffer = ''
        self.count = 0
        self.digit_limit = digit_limit
        
    def write (self, data):
        self.buffer = self.buffer + data
        if len(self.buffer) > 80:
            line, self.buffer = self.buffer[:80], self.buffer[80:]
            self.file.write (line+'\r\n')
            self.count = self.count + 80
        if self.count > self.digit_limit:
            return 1
        else:
            return 0
            
def main (env, stdin, stdout):
    parts = string.split (env['REQUEST_URI'], '/')
    if len(parts) >= 3:
        ndigits = string.atoi (parts[2])
    else:
        ndigits = 5000
    stdout.write ('Content-Type: text/plain\r\n\r\n')
    go (line_writer (stdout, ndigits))


=== Added File Zope/lib/python/ZServer/medusa/thread/select_trigger.py ===
# -*- Mode: Python; tab-width: 4 -*-

VERSION_STRING = "$Id: select_trigger.py,v 1.1.2.1 2002/09/17 05:16:10 chrism Exp $"

import asyncore
import asynchat

import os
import socket
import string
import thread

if os.name == 'posix':

    class trigger (asyncore.file_dispatcher):
    
        "Wake up a call to select() running in the main thread"
        
        # This is useful in a context where you are using Medusa's I/O
        # subsystem to deliver data, but the data is generated by another
        # thread.  Normally, if Medusa is in the middle of a call to
        # select(), new output data generated by another thread will have
        # to sit until the call to select() either times out or returns.
        # If the trigger is 'pulled' by another thread, it should immediately
        # generate a READ event on the trigger object, which will force the
        # select() invocation to return.
        
        # A common use for this facility: letting Medusa manage I/O for a
        # large number of connections; but routing each request through a
        # thread chosen from a fixed-size thread pool.  When a thread is
        # acquired, a transaction is performed, but output data is
        # accumulated into buffers that will be emptied more efficiently
        # by Medusa. [picture a server that can process database queries
        # rapidly, but doesn't want to tie up threads waiting to send data
        # to low-bandwidth connections]
        
        # The other major feature provided by this class is the ability to
        # move work back into the main thread: if you call pull_trigger()
        # with a thunk argument, when select() wakes up and receives the
        # event it will call your thunk from within that thread.  The main
        # purpose of this is to remove the need to wrap thread locks around
        # Medusa's data structures, which normally do not need them.  [To see
        # why this is true, imagine this scenario: A thread tries to push some
        # new data onto a channel's outgoing data queue at the same time that
        # the main thread is trying to remove some]
        
        def __init__ (self):
            r, w = os.pipe()
            self.trigger = w
            asyncore.file_dispatcher.__init__ (self, r)
            self.lock = thread.allocate_lock()
            self.thunks = []
            
        def __repr__ (self):
            return '<select-trigger (pipe) at %x>' % id(self)
            
        def readable (self):
            return 1
            
        def writable (self):
            return 0
            
        def handle_connect (self):
            pass
            
        def pull_trigger (self, thunk=None):
                # print 'PULL_TRIGGER: ', len(self.thunks)
            if thunk:
                try:
                    self.lock.acquire()
                    self.thunks.append (thunk)
                finally:
                    self.lock.release()
            os.write (self.trigger, 'x')
            
        def handle_read (self):
            self.recv (8192)
            try:
                self.lock.acquire()
                for thunk in self.thunks:
                    try:
                        thunk()
                    except:
                        (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
                        print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
                self.thunks = []
            finally:
                self.lock.release()
                
else:


    # win32-safe version

    class trigger (asyncore.dispatcher):
    
        address = ('127.9.9.9', 19999)
        
        def __init__ (self):
            a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
            w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
            
            # set TCP_NODELAY to true to avoid buffering
            w.setsockopt(socket.IPPROTO_TCP, 1, 1)
            
            # tricky: get a pair of connected sockets
            host='127.0.0.1'
            port=19999
            while 1:
                try:
                    self.address=(host, port)
                    a.bind(self.address)
                    break
                except:
                    if port <= 19950:
                        raise 'Bind Error', 'Cannot bind trigger!'
                    port=port - 1
                    
            a.listen (1)
            w.setblocking (0)
            try:
                w.connect (self.address)
            except:
                pass
            r, addr = a.accept()
            a.close()
            w.setblocking (1)
            self.trigger = w
            
            asyncore.dispatcher.__init__ (self, r)
            self.lock = thread.allocate_lock()
            self.thunks = []
            self._trigger_connected = 0
            
        def __repr__ (self):
            return '<select-trigger (loopback) at %x>' % id(self)
            
        def readable (self):
            return 1
            
        def writable (self):
            return 0
            
        def handle_connect (self):
            pass
            
        def pull_trigger (self, thunk=None):
            if thunk:
                try:
                    self.lock.acquire()
                    self.thunks.append (thunk)
                finally:
                    self.lock.release()
            self.trigger.send ('x')
            
        def handle_read (self):
            self.recv (8192)
            try:
                self.lock.acquire()
                for thunk in self.thunks:
                    try:
                        thunk()
                    except:
                        (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
                        print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
                self.thunks = []
            finally:
                self.lock.release()
                
                
the_trigger = None

class trigger_file:

    "A 'triggered' file object"
    
    buffer_size = 4096
    
    def __init__ (self, parent):
        global the_trigger
        if the_trigger is None:
            the_trigger = trigger()
        self.parent = parent
        self.buffer = ''
        
    def write (self, data):
        self.buffer = self.buffer + data
        if len(self.buffer) > self.buffer_size:
            d, self.buffer = self.buffer, ''
            the_trigger.pull_trigger (
                    lambda d=d,p=self.parent: p.push (d)
                    )
            
    def writeline (self, line):
        self.write (line+'\r\n')
        
    def writelines (self, lines):
        self.write (
                string.joinfields (
                        lines,
                        '\r\n'
                        ) + '\r\n'
                )
        
    def flush (self):
        if self.buffer:
            d, self.buffer = self.buffer, ''
            the_trigger.pull_trigger (
                    lambda p=self.parent,d=d: p.push (d)
                    )
            
    def softspace (self, *args):
        pass
        
    def close (self):
            # in a derived class, you may want to call trigger_close() instead.
        self.flush()
        self.parent = None
        
    def trigger_close (self):
        d, self.buffer = self.buffer, ''
        p, self.parent = self.parent, None
        the_trigger.pull_trigger (
                lambda p=p,d=d: (p.push(d), p.close_when_done())
                )
        
if __name__ == '__main__':

    import time
    
    def thread_function (output_file, i, n):
        print 'entering thread_function'
        while n:
            time.sleep (5)
            output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
            output_file.flush()
            n = n - 1
        output_file.close()
        print 'exiting thread_function'
        
    class thread_parent (asynchat.async_chat):
    
        def __init__ (self, conn, addr):
            self.addr = addr
            asynchat.async_chat.__init__ (self, conn)
            self.set_terminator ('\r\n')
            self.buffer = ''
            self.count = 0
            
        def collect_incoming_data (self, data):
            self.buffer = self.buffer + data
            
        def found_terminator (self):
            data, self.buffer = self.buffer, ''
            if not data:
                asyncore.close_all()
                print "done"
                return
            n = string.atoi (string.split (data)[0])
            tf = trigger_file (self)
            self.count = self.count + 1
            thread.start_new_thread (thread_function, (tf, self.count, n))
            
    class thread_server (asyncore.dispatcher):
    
        def __init__ (self, family=socket.AF_INET, address=('', 9003)):
            asyncore.dispatcher.__init__ (self)
            self.create_socket (family, socket.SOCK_STREAM)
            self.set_reuse_addr()
            self.bind (address)
            self.listen (5)
            
        def handle_accept (self):
            conn, addr = self.accept()
            tp = thread_parent (conn, addr)
            
    thread_server()
    #asyncore.loop(1.0, use_poll=1)
    try:
        asyncore.loop ()
    except:
        asyncore.close_all()


=== Added File Zope/lib/python/ZServer/medusa/thread/test_module.py ===
# -*- Mode: Python; tab-width: 4 -*-

import pprint

def main (env, stdin, stdout):

    stdout.write (
            '<html><body><h1>Test CGI Module</h1>\r\n'
            '<br>The Environment:<pre>\r\n'
            )
    pprint.pprint (env, stdout)
    stdout.write ('</pre></body></html>\r\n')
    


=== Added File Zope/lib/python/ZServer/medusa/thread/thread_channel.py ===
# -*- Mode: Python; tab-width: 4 -*-

VERSION_STRING = "$Id: thread_channel.py,v 1.1.2.1 2002/09/17 05:16:10 chrism Exp $"

# This will probably only work on Unix.

# The disadvantage to this technique is that it wastes file
# descriptors (especially when compared to select_trigger.py)

# May be possible to do it on Win32, using TCP localhost sockets.
# [does winsock support 'socketpair'?]

import asyncore
import asynchat

import fcntl
import os
import socket
import string
import thread

try:
    from fcntl import F_GETFL, F_SETFL, O_NDELAY
except ImportError:
    from FCNTL import F_GETFL, F_SETFL, O_NDELAY

# this channel slaves off of another one.  it starts a thread which
# pumps its output through the 'write' side of the pipe.  The 'read'
# side of the pipe will then notify us when data is ready.  We push
# this data on the owning data channel's output queue.

class thread_channel (asyncore.file_dispatcher):

    buffer_size = 8192
    
    def __init__ (self, channel, function, *args):
        self.parent = channel
        self.function = function
        self.args = args
        self.pipe = rfd, wfd = os.pipe()
        asyncore.file_dispatcher.__init__ (self, rfd)
        
    def start (self):
        rfd, wfd = self.pipe
        
        # The read side of the pipe is set to non-blocking I/O; it is
        # 'owned' by medusa.
        
        flags = fcntl.fcntl (rfd, F_GETFL, 0)
        fcntl.fcntl (rfd, F_SETFL, flags | O_NDELAY)
        
        # The write side of the pipe is left in blocking mode; it is
        # 'owned' by the thread.  However, we wrap it up as a file object.
        # [who wants to 'write()' to a number?]
        
        of = os.fdopen (wfd, 'w')
        
        thread.start_new_thread (
                self.function,
                # put the output file in front of the other arguments
                (of,) + self.args
                )
        
    def writable (self):
        return 0
        
    def readable (self):
        return 1
        
    def handle_read (self):
        data = self.recv (self.buffer_size)
        self.parent.push (data)
        
    def handle_close (self):
            # Depending on your intentions, you may want to close
            # the parent channel here.
        self.close()
        
        # Yeah, it's bad when the test code is bigger than the library code.
        
if __name__ == '__main__':

    import time
    
    def thread_function (output_file, i, n):
        print 'entering thread_function'
        while n:
            time.sleep (5)
            output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
            output_file.flush()
            n = n - 1
        output_file.close()
        print 'exiting thread_function'
        
    class thread_parent (asynchat.async_chat):
    
        def __init__ (self, conn, addr):
            self.addr = addr
            asynchat.async_chat.__init__ (self, conn)
            self.set_terminator ('\r\n')
            self.buffer = ''
            self.count = 0
            
        def collect_incoming_data (self, data):
            self.buffer = self.buffer + data
            
        def found_terminator (self):
            data, self.buffer = self.buffer, ''
            n = string.atoi (string.split (data)[0])
            tc = thread_channel (self, thread_function, self.count, n)
            self.count = self.count + 1
            tc.start()
            
    class thread_server (asyncore.dispatcher):
    
        def __init__ (self, family=socket.AF_INET, address=('127.0.0.1', 9003)):
            asyncore.dispatcher.__init__ (self)
            self.create_socket (family, socket.SOCK_STREAM)
            self.set_reuse_addr()
            self.bind (address)
            self.listen (5)
            
        def handle_accept (self):
            conn, addr = self.accept()
            tp = thread_parent (conn, addr)
            
    thread_server()
    #asyncore.loop(1.0, use_poll=1)
    asyncore.loop ()


=== Added File Zope/lib/python/ZServer/medusa/thread/thread_handler.py ===
# -*- Mode: Python; tab-width: 4 -*-

import re
import string
import StringIO
import sys

import os
import sys
import time

import counter
import select_trigger
import producers

from default_handler import split_path, unquote, get_header

import fifo

import threading

class request_queue:

    def __init__ (self):
        self.mon = threading.RLock()
        self.cv = threading.Condition (self.mon)
        self.queue = fifo.fifo()
        
    def put (self, item):
        self.cv.acquire()
        self.queue.push (item)
        self.cv.notify()
        self.cv.release()
        
    def get(self):
        self.cv.acquire()
        while not self.queue:
            self.cv.wait()
        result = self.queue.pop()
        self.cv.release()
        return result
        
header2env= {
        'Content-Length'	: 'CONTENT_LENGTH',
        'Content-Type'		: 'CONTENT_TYPE',
        'Referer'			: 'HTTP_REFERER',
        'User-Agent'		: 'HTTP_USER_AGENT',
        'Accept'			: 'HTTP_ACCEPT',
        'Accept-Charset'	: 'HTTP_ACCEPT_CHARSET',
        'Accept-Language'	: 'HTTP_ACCEPT_LANGUAGE',
        'Host'				: 'HTTP_HOST',
        'Connection'		: 'CONNECTION_TYPE',
        'Authorization'		: 'HTTP_AUTHORIZATION',
        'Cookie'			: 'HTTP_COOKIE',
        }

# convert keys to lower case for case-insensitive matching
for (key,value) in header2env.items():
    del header2env[key]
    key=string.lower(key)
    header2env[key]=value
    
class thread_output_file (select_trigger.trigger_file):

    def close (self):
        self.trigger_close()
        
class script_handler:

    def __init__ (self, queue, document_root=""):
        self.modules = {}
        self.document_root = document_root
        self.queue = queue
        
    def add_module (self, module, *names):
        if not names:
            names = ["/%s" % module.__name__]
        for name in names:
            self.modules['/'+name] = module
            
    def match (self, request):
        uri = request.uri
        
        i = string.find(uri, "/", 1)
        if i != -1:
            uri = uri[:i]
            
        i = string.find(uri, "?", 1)
        if i != -1:
            uri = uri[:i]
            
        if self.modules.has_key (uri):
            request.module = self.modules[uri]
            return 1
        else:
            return 0
            
    def handle_request (self, request):
    
        [path, params, query, fragment] = split_path (request.uri)
        
        while path and path[0] == '/':
            path = path[1:]
            
        if '%' in path:
            path = unquote (path)
            
        env = {}
        
        env['REQUEST_URI'] = "/" + path
        env['REQUEST_METHOD']	= string.upper(request.command)
        env['SERVER_PORT']	 = str(request.channel.server.port)
        env['SERVER_NAME']	 = request.channel.server.server_name
        env['SERVER_SOFTWARE'] = request['Server']
        env['DOCUMENT_ROOT']	 = self.document_root
        
        parts = string.split(path, "/")
        
        # are script_name and path_info ok?
        
        env['SCRIPT_NAME']	= "/" + parts[0]
        
        if query and query[0] == "?":
            query = query[1:]
            
        env['QUERY_STRING']	= query
        
        try:
            path_info = "/" + string.join(parts[1:], "/")
        except:
            path_info = ''
            
        env['PATH_INFO']		= path_info
        env['GATEWAY_INTERFACE']='CGI/1.1' 					# what should this really be?
        env['REMOTE_ADDR']		=request.channel.addr[0]
        env['REMOTE_HOST']		=request.channel.addr[0]	# TODO: connect to resolver
        
        for header in request.header:
            [key,value]=string.split(header,": ",1)
            key=string.lower(key)
            
            if header2env.has_key(key):
                if header2env[key]:
                    env[header2env[key]]=value
            else:
                key = 'HTTP_' + string.upper(
                        string.join(
                                string.split (key,"-"),
                                "_"
                                )
                        )
                env[key]=value
                
                ## remove empty environment variables
        for key in env.keys():
            if env[key]=="" or env[key]==None:
                del env[key]
                
        try:
            httphost = env['HTTP_HOST']
            parts = string.split(httphost,":")
            env['HTTP_HOST'] = parts[0]
        except KeyError: 
            pass
            
        if request.command in ('put', 'post'):
                # PUT data requires a correct Content-Length: header
                # (though I bet with http/1.1 we can expect chunked encoding)
            request.collector = collector (self, request, env)
            request.channel.set_terminator (None)
        else:
            sin = StringIO.StringIO ('')
            self.continue_request (sin, request, env)
            
    def continue_request (self, stdin, request, env):
        stdout = header_scanning_file (
                request,
                thread_output_file (request.channel)
                )
        self.queue.put (
                (request.module.main, (env, stdin, stdout))
                )
        
HEADER_LINE = re.compile ('([A-Za-z0-9-]+): ([^\r\n]+)')

# A file wrapper that handles the CGI 'Status:' header hack
# by scanning the output.

class header_scanning_file:

    def __init__ (self, request, file):
        self.buffer = ''
        self.request = request
        self.file = file
        self.got_header = 0
        self.bytes_out = counter.counter()
        
    def write (self, data):
        if self.got_header:
            self._write (data)
        else:
                # CGI scripts may optionally provide extra headers.
                # 
                # If they do not, then the output is assumed to be
                # text/html, with an HTTP reply code of '200 OK'.
                # 
                # If they do, we need to scan those headers for one in
                # particular: the 'Status:' header, which will tell us
                # to use a different HTTP reply code [like '302 Moved']
                #
            self.buffer = self.buffer + data
            lines = string.split (self.buffer, '\n')
            # ignore the last piece, it is either empty, or a partial line
            lines = lines[:-1]
            # look for something un-header-like
            for i in range(len(lines)):
                li = lines[i]
                if (not li) or (HEADER_LINE.match (li) is None):
                        # this is either the header separator, or it
                        # is not a header line.
                    self.got_header = 1
                    h = self.build_header (lines[:i])
                    self._write (h)
                    # rejoin the rest of the data
                    d = string.join (lines[i:], '\n')
                    self._write (d)
                    self.buffer = ''
                    break
                    
    def build_header (self, lines):
        status = '200 OK'
        saw_content_type = 0
        hl = HEADER_LINE
        for line in lines:
            mo = hl.match (line)
            if mo is not None:
                h = string.lower (mo.group(1))
                if h == 'status':
                    status = mo.group(2)
                elif h == 'content-type':
                    saw_content_type = 1
        lines.insert (0, 'HTTP/1.0 %s' % status)
        lines.append ('Server: ' + self.request['Server'])
        lines.append ('Date: ' + self.request['Date'])
        if not saw_content_type:
            lines.append ('Content-Type: text/html')
        lines.append ('Connection: close')
        return string.join (lines, '\r\n')+'\r\n\r\n'
        
    def _write (self, data):
        self.bytes_out.increment (len(data))
        self.file.write (data)
        
    def writelines(self, list):
        self.write (string.join (list, ''))
        
    def flush(self):
        pass
        
    def close (self):
        if not self.got_header:
                # managed to slip through our header detectors
            self._write (self.build_header (['Status: 502', 'Content-Type: text/html']))
            self._write (
                    '<html><h1>Server Error</h1>\r\n'
                    '<b>Bad Gateway:</b> No Header from CGI Script\r\n'
                    '<pre>Data: %s</pre>'
                    '</html>\r\n' % (repr(self.buffer))
                    )
        self.request.log (int(self.bytes_out.as_long()))
        self.file.close()
        self.request.channel.current_request = None
        
        
class collector:

    "gathers input for PUT requests"
    
    def __init__ (self, handler, request, env):
        self.handler	= handler
        self.env = env
        self.request	= request
        self.data = StringIO.StringIO()
        
        # make sure there's a content-length header
        self.cl = request.get_header ('content-length')
        
        if not self.cl:
            request.error (411)
            return
        else:
            self.cl = string.atoi(self.cl)
            
    def collect_incoming_data (self, data):
        self.data.write (data)
        if self.data.tell() >= self.cl:
            self.data.seek(0)
            
            h=self.handler
            r=self.request
            
            # set the terminator back to the default
            self.request.channel.set_terminator ('\r\n\r\n')
            del self.handler
            del self.request
            
            h.continue_request (self.data, r, self.env)
            
            
class request_loop_thread (threading.Thread):

    def __init__ (self, queue):
        threading.Thread.__init__ (self)
        self.setDaemon(1)
        self.queue = queue
        
    def run (self):
        while 1:
            function, (env, stdin, stdout) = self.queue.get()
            function (env, stdin, stdout)
            stdout.close()
            
            # ===========================================================================
            #							   Testing
            # ===========================================================================
            
if __name__ == '__main__':

    import sys
    
    if len(sys.argv) < 2:
        print 'Usage: %s <worker_threads>' % sys.argv[0]
    else:
        nthreads = string.atoi (sys.argv[1])
        
        import asyncore
        import http_server
        # create a generic web server
        hs = http_server.http_server ('', 7080)
        
        # create a request queue
        q = request_queue()
        
        # create a script handler
        sh = script_handler (q)
        
        # install the script handler on the web server
        hs.install_handler (sh)
        
        # get a couple of CGI modules
        import test_module
        import pi_module
        
        # install the module on the script handler
        sh.add_module (test_module, 'test')
        sh.add_module (pi_module, 'pi')
        
        # fire up the worker threads
        for i in range (nthreads):
            rt = request_loop_thread (q)
            rt.start()
            
            # start the main event loop
        asyncore.loop()