[Zope3-checkins] CVS: Zope3/src/zope/server - __init__.py:1.1.2.1 adjustments.py:1.1.2.1 buffers.py:1.1.2.1 dualmodechannel.py:1.1.2.1 fixedstreamreceiver.py:1.1.2.1 maxsockets.py:1.1.2.1 serverbase.py:1.1.2.1 serverchannelbase.py:1.1.2.1 taskthreads.py:1.1.2.1 utilities.py:1.1.2.1 zlogintegration.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:33:20 -0500


Update of /cvs-repository/Zope3/src/zope/server
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/server

Added Files:
      Tag: NameGeddon-branch
	__init__.py adjustments.py buffers.py dualmodechannel.py 
	fixedstreamreceiver.py maxsockets.py serverbase.py 
	serverchannelbase.py taskthreads.py utilities.py 
	zlogintegration.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zope/server/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Zope.Server package.

$Id: __init__.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""

from zope.server.interfaces.interfaces import IDispatcher
from zope.interface.implements import implements

import asyncore

implements(asyncore.dispatcher, IDispatcher, 0)



=== Added File Zope3/src/zope/server/adjustments.py ===
# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.

import zope.server.maxsockets


class Adjustments:
    """This class contains tunable communication parameters.

    You can either change default_adj to adjust parameters for
    all sockets, or you can create a new instance of this class,
    change its attributes, and pass it to the channel constructors.
    """

    # backlog is the argument to pass to socket.listen().
    backlog = 1024

    # recv_bytes is the argument to pass to socket.recv().
    recv_bytes = 8192

    # send_bytes is the number of bytes to send to socket.send().
    send_bytes = 8192

    # copy_bytes is the number of bytes to copy from one file to another.
    copy_bytes = 65536

    # Create a tempfile if the pending output data gets larger
    # than outbuf_overflow.  With RAM so cheap, this probably
    # ought to be set to the 16-32 MB range (circa 2001) for
    # good performance with big transfers.  The default is
    # conservative.
    outbuf_overflow = 1050000

    # Create a tempfile if the data received gets larger
    # than inbuf_overflow.
    inbuf_overflow = 525000

    # Stop accepting new connections if too many are already active.
    connection_limit = MaxSockets.max_select_sockets() - 3  # Safe

    # Minimum seconds between cleaning up inactive channels.
    cleanup_interval = 300

    # Maximum seconds to leave an inactive connection open.
    channel_timeout = 900

    # Boolean: turn off to not log premature client disconnects.
    log_socket_errors = 1


default_adj = Adjustments()



=== Added File Zope3/src/zope/server/buffers.py ===
# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.


try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO


# copy_bytes controls the size of temp. strings for shuffling data around.
COPY_BYTES = 1 << 18  # 256K

# The maximum number of bytes to buffer in a simple string.
STRBUF_LIMIT = 8192


class FileBasedBuffer:

    remain = 0

    def __init__(self, file, from_buffer=None):
        self.file = file
        if from_buffer is not None:
            from_file = from_buffer.getfile()
            read_pos = from_file.tell()
            from_file.seek(0)
            while 1:
                data = from_file.read(COPY_BYTES)
                if not data:
                    break
                file.write(data)
            self.remain = int(file.tell() - read_pos)
            from_file.seek(read_pos)
            file.seek(read_pos)

    def __len__(self):
        return self.remain

    def append(self, s):
        file = self.file
        read_pos = file.tell()
        file.seek(0, 2)
        file.write(s)
        file.seek(read_pos)
        self.remain = self.remain + len(s)

    def get(self, bytes=-1, skip=0):
        file = self.file
        if not skip:
            read_pos = file.tell()
        if bytes < 0:
            # Read all
            res = file.read()
        else:
            res = file.read(bytes)
        if skip:
            self.remain -= len(res)
        else:
            file.seek(read_pos)
        return res

    def skip(self, bytes, allow_prune=0):
        if self.remain < bytes:
            raise ValueError, (
                "Can't skip %d bytes in buffer of %d bytes" %
                (bytes, self.remain))
        self.file.seek(bytes, 1)
        self.remain = self.remain - bytes

    def newfile(self):
        raise 'NotImplemented'

    def prune(self):
        file = self.file
        if self.remain == 0:
            read_pos = file.tell()
            file.seek(0, 2)
            sz = file.tell()
            file.seek(read_pos)
            if sz == 0:
                # Nothing to prune.
                return
        nf = self.newfile()
        while 1:
            data = file.read(COPY_BYTES)
            if not data:
                break
            nf.write(data)
        self.file = nf

    def getfile(self):
        return self.file



class TempfileBasedBuffer(FileBasedBuffer):

    def __init__(self, from_buffer=None):
        FileBasedBuffer.__init__(self, self.newfile(), from_buffer)

    def newfile(self):
        from tempfile import TemporaryFile
        return TemporaryFile('w+b')



class StringIOBasedBuffer(FileBasedBuffer):

    def __init__(self, from_buffer=None):
        if from_buffer is not None:
            FileBasedBuffer.__init__(self, StringIO(), from_buffer)
        else:
            # Shortcut. :-)
            self.file = StringIO()

    def newfile(self):
        return StringIO()



class OverflowableBuffer:
    """
    This buffer implementation has four stages:
    - No data
    - String-based buffer
    - StringIO-based buffer
    - Temporary file storage
    The first two stages are fastest for simple transfers.
    """

    overflowed = 0
    buf = None
    strbuf = ''  # String-based buffer.

    def __init__(self, overflow):
        # overflow is the maximum to be stored in a StringIO buffer.
        self.overflow = overflow

    def __len__(self):
        buf = self.buf
        if buf is not None:
            return len(buf)
        else:
            return len(self.strbuf)

    def _create_buffer(self):
        # print 'creating buffer'
        strbuf = self.strbuf
        if len(strbuf) >= self.overflow:
            self._set_large_buffer()
        else:
            self._set_small_buffer()
        buf = self.buf
        if strbuf:
            buf.append(self.strbuf)
            self.strbuf = ''
        return buf

    def _set_small_buffer(self):
        self.buf = StringIOBasedBuffer(self.buf)
        self.overflowed = 0

    def _set_large_buffer(self):
        self.buf = TempfileBasedBuffer(self.buf)
        self.overflowed = 1

    def append(self, s):
        buf = self.buf
        if buf is None:
            strbuf = self.strbuf
            if len(strbuf) + len(s) < STRBUF_LIMIT:
                self.strbuf = strbuf + s
                return
            buf = self._create_buffer()
        buf.append(s)
        sz = len(buf)
        if not self.overflowed:
            if sz >= self.overflow:
                self._set_large_buffer()

    def get(self, bytes=-1, skip=0):
        buf = self.buf
        if buf is None:
            strbuf = self.strbuf
            if not skip:
                return strbuf
            buf = self._create_buffer()
        return buf.get(bytes, skip)

    def skip(self, bytes, allow_prune=0):
        buf = self.buf
        if buf is None:
            strbuf = self.strbuf
            if allow_prune and bytes == len(strbuf):
                # We could slice instead of converting to
                # a buffer, but that would eat up memory in
                # large transfers.
                self.strbuf = ''
                return
            buf = self._create_buffer()
        buf.skip(bytes, allow_prune)

    def prune(self):
        """
        A potentially expensive operation that removes all data
        already retrieved from the buffer.
        """
        buf = self.buf
        if buf is None:
            self.strbuf = ''
            return
        buf.prune()
        if self.overflowed:
            sz = len(buf)
            if sz < self.overflow:
                # Revert to a faster buffer.
                self._set_small_buffer()

    def getfile(self):
        buf = self.buf
        if buf is None:
            buf = self._create_buffer()
        return buf.getfile()


=== Added File Zope3/src/zope/server/dualmodechannel.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: dualmodechannel.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""

import asyncore
import socket
from time import time
from UserDict import UserDict

from Thread import SelectTrigger
from zope.server.adjustments import default_adj
from zope.server.buffers import OverflowableBuffer


# Create the main trigger if it doesn't exist yet.
if SelectTrigger.the_trigger is None:
    SelectTrigger.the_trigger = SelectTrigger.Trigger()



class DualModeChannel(asyncore.dispatcher):
    """Channel that switches between asynchronous and synchronous mode.

    Call set_sync() before using a channel in a thread other than
    the thread handling the main loop.

    Call set_async() to give the channel back to the thread handling
    the main loop.
    """

    __implements__ = asyncore.dispatcher.__implements__

    # will_close is set to 1 to close the socket.
    will_close = 0

    # boolean: async or sync mode
    async_mode = 1

    def __init__(self, conn, addr, adj=None):
        self.addr = addr
        if adj is None:
            adj = default_adj
        self.adj = adj
        self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
        self.creation_time = time()
        asyncore.dispatcher.__init__(self, conn)

    #
    # ASYNCHRONOUS METHODS
    #

    def handle_close(self):
        self.close()

    def writable(self):
        if not self.async_mode:
            return 0
        return self.will_close or self.outbuf

    def handle_write(self):
        if not self.async_mode:
            return
        self.inner_handle_write()

    def inner_handle_write(self):
        if self.outbuf:
            try:
                self._flush_some()
            except socket.error:
                self.handle_comm_error()
        elif self.will_close:
            self.close()

    def readable(self):
        if not self.async_mode:
            return 0
        return not self.will_close

    def handle_read(self):
        if not self.async_mode:
            return
        self.inner_handle_read()

    def inner_handle_read(self):
        try:
            data = self.recv(self.adj.recv_bytes)
        except socket.error:
            self.handle_comm_error()
            return
        self.received(data)

    def received(self, data):
        """
        Override to receive data in async mode.
        """
        pass

    def handle_comm_error(self):
        """
        Designed for handling communication errors that occur
        during asynchronous operations *only*.  Probably should log
        this, but in a different place.
        """
        self.handle_error()

    def set_sync(self):
        """Switches to synchronous mode.

        The main thread will stop calling received().
        """
        self.async_mode = 0

    #
    # SYNCHRONOUS METHODS
    #

    def write(self, data):
        if data:
            self.outbuf.append(data)
        while len(self.outbuf) >= self.adj.send_bytes:
            # Send what we can without blocking.
            # We propagate errors to the application on purpose
            # (to stop the application if the connection closes).
            if not self._flush_some():
                break

    def flush(self, block=1):
        """Sends pending data.

        If block is set, this pauses the application.  If it is turned
        off, only the amount of data that can be sent without blocking
        is sent.
        """
        if not block:
            while self._flush_some():
                pass
            return
        blocked = 0
        try:
            while self.outbuf:
                # We propagate errors to the application on purpose.
                if not blocked:
                    self.socket.setblocking(1)
                    blocked = 1
                self._flush_some()
        finally:
            if blocked:
                self.socket.setblocking(0)

    def set_async(self):
        """Switches to asynchronous mode.

        The main thread will begin calling received() again.
        """
        self.async_mode = 1
        self.pull_trigger()

    #
    # METHODS USED IN BOTH MODES
    #

    def pull_trigger(self):
        """Wakes up the main loop.
        """
        SelectTrigger.the_trigger.pull_trigger()

    def _flush_some(self):
        """Flushes data.

        Returns 1 if some data was sent."""
        outbuf = self.outbuf
        if outbuf and self.connected:
            chunk = outbuf.get(self.adj.send_bytes)
            num_sent = self.send(chunk)
            if num_sent:
                outbuf.skip(num_sent, 1)
                return 1
        return 0

    def close_when_done(self):
        # We might be able close immediately.
        while self._flush_some():
            pass
        if not self.outbuf:
            # Quick exit.
            self.close()
        else:
            # Wait until outbuf is flushed.
            self.will_close = 1
            if not self.async_mode:
                self.async_mode = 1
                self.pull_trigger()


allocate_lock = None


class SimultaneousModeChannel (DualModeChannel):
    """Layer on top of DualModeChannel that allows communication in
    both the main thread and other threads at the same time.

    The channel operates in synchronous mode with an asynchronous
    helper.  The asynchronous callbacks empty the output buffer
    and fill the input buffer.
    """

    __implements__ = asyncore.dispatcher.__implements__


    def __init__(self, conn, addr, adj=None):
        global allocate_lock
        if allocate_lock is None:
            from thread import allocate_lock

        # writelock protects all accesses to outbuf, since reads and
        # writes of buffers in this class need to be serialized.
        writelock = allocate_lock()
        self._writelock_acquire = writelock.acquire
        self._writelock_release = writelock.release
        self._writelock_locked = writelock.locked
        DualModeChannel.__init__(self, conn, addr, adj)

    #
    # ASYNCHRONOUS METHODS
    #

    def writable(self):
        return self.will_close or (
            self.outbuf and not self._writelock_locked())

    def handle_write(self):
        if not self._writelock_acquire(0):
            # A synchronous method is writing.
            return
        try:
            self.inner_handle_write()
        finally:
            self._writelock_release()

    def readable(self):
        return not self.will_close

    def handle_read(self):
        self.inner_handle_read()

    def set_sync(self):
        pass

    #
    # SYNCHRONOUS METHODS
    #

    def write(self, data):
        self._writelock_acquire()
        try:
            DualModeChannel.write(self, data)
        finally:
            self._writelock_release()

    def flush(self, block=1):
        self._writelock_acquire()
        try:
            DualModeChannel.flush(self, block)
        finally:
            self._writelock_release()

    def set_async(self):
        pass

    #
    # METHODS USED IN BOTH MODES
    #

    def close_when_done(self):
        self.will_close = 1
        self.pull_trigger()




=== Added File Zope3/src/zope/server/fixedstreamreceiver.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: fixedstreamreceiver.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""

from zope.server.interfaces.interfaces import IStreamConsumer


class FixedStreamReceiver:

    __implements__ = IStreamConsumer

    # See Zope.Server.IStreamConsumer.IStreamConsumer
    completed = 0

    def __init__(self, cl, buf):
        self.remain = cl
        self.buf = buf

    ############################################################
    # Implementation methods for interface
    # Zope.Server.IStreamConsumer

    def received(self, data):
        'See Zope.Server.IStreamConsumer.IStreamConsumer'
        rm = self.remain
        if rm < 1:
            self.completed = 1  # Avoid any chance of spinning
            return 0
        datalen = len(data)
        if rm <= datalen:
            self.buf.append(data[:rm])
            self.remain = 0
            self.completed = 1
            return rm
        else:
            self.buf.append(data)
            self.remain -= datalen
            return datalen

    #
    ############################################################

    def getfile(self):
        return self.buf.getfile()


=== Added File Zope3/src/zope/server/maxsockets.py ===
# Medusa max_sockets module.

import socket
import select

# several factors here we might want to test:
# 1) max we can create
# 2) max we can bind
# 3) max we can listen on
# 4) max we can connect

def max_server_sockets():
    sl = []
    while 1:
        try:
            s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
            s.bind (('',0))
            s.listen(5)
            sl.append (s)
        except:
            break
    num = len(sl)
    for s in sl:
        s.close()
    del sl
    return num

def max_client_sockets():
    # make a server socket
    server = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
    server.bind (('', 9999))
    server.listen (5)
    sl = []
    while 1:
        try:
            s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
            s.connect (('', 9999))
            conn, addr = server.accept()
            sl.append ((s,conn))
        except:
            break
    num = len(sl)
    for s,c in sl:
        s.close()
        c.close()
    del sl
    return num

def max_select_sockets():
    sl = []
    while 1:
        try:
            num = len(sl)
            for i in range(1 + len(sl) * 0.05):
                # Increase exponentially.
                s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
                s.bind (('',0))
                s.listen(5)
                sl.append (s)
            select.select(sl,[],[],0)
        except:
            break
    for s in sl:
        s.close()
    del sl
    return num


=== Added File Zope3/src/zope/server/serverbase.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: serverbase.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""

import asyncore
import logging
import socket

from zope.server.adjustments import default_adj

from zope.server.interfaces.interfaces import IServer


class ServerBase(asyncore.dispatcher, object):
    """Async. server base for launching derivatives of ServerChannelBase.
    """

    __implements__ = asyncore.dispatcher.__implements__, IServer

    channel_class = None    # Override with a channel class.
    SERVER_IDENT = 'Zope.Server.ServerBase'  # Override.

    def __init__(self, ip, port, task_dispatcher=None, adj=None, start=1,
                 hit_log=None, verbose=0):
        if adj is None:
            adj = default_adj
        self.adj = adj
        asyncore.dispatcher.__init__(self)
        self.port = port
        self.task_dispatcher = task_dispatcher
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((ip, port))
        self.verbose = verbose
        self.hit_log = hit_log
        self.server_name = self.computeServerName(ip)
        self.logger = logging.getLogger(self.__class__.__name__)

        if start:
            self.accept_connections()

    def log(self, message):
        # Override asyncore's default log()
        self.logger.info(message)

    level_mapping = {
        'info': logging.INFO,
        'error': logging.ERROR,
        'warning': logging.WARN,
        }

    def log_info(self, message, type='info'):
        self.logger.log(self.level_mapping.get(type, logging.INFO), message)

    def computeServerName(self, ip=''):
        if ip:
            server_name = str(ip)
        else:
            server_name = str(socket.gethostname())
        # Convert to a host name if necessary.
        is_hostname = 0
        for c in server_name:
            if c != '.' and not c.isdigit():
                is_hostname = 1
                break
        if not is_hostname:
            if self.verbose:
                self.log_info('Computing hostname', 'info')
            try:
                server_name = socket.gethostbyaddr(server_name)[0]
            except socket.error:
                if self.verbose:
                    self.log_info('Cannot do reverse lookup', 'info')
        return server_name

    def accept_connections(self):
        self.accepting = 1
        self.socket.listen(self.adj.backlog)  # Circumvent asyncore's NT limit
        if self.verbose:
            self.log_info('%s started.\n'
                          '\tHostname: %s\n\tPort: %d' % (
                self.SERVER_IDENT,
                self.server_name,
                self.port
                ))


    def addTask(self, task):
        td = self.task_dispatcher
        if td is not None:
            td.addTask(task)
        else:
            task.service()

    ############################################################
    # Implementation methods for interface
    # Zope.Server.IDispatcher.IDispatcher

    def readable(self):
        'See Zope.Server.IDispatcher.IDispatcher'
        return (self.accepting and
                len(asyncore.socket_map) < self.adj.connection_limit)

    def writable(self):
        'See Zope.Server.IDispatcher.IDispatcher'
        return 0

    ######################################
    # from: Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler

    def handle_read(self):
        'See Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler'
        pass

    def handle_connect(self):
        'See Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler'
        pass

    def handle_accept(self):
        'See Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler'
        try:
            v = self.accept()
            if v is None:
                return
            conn, addr = v
        except socket.error:
            # Linux: On rare occasions we get a bogus socket back from
            # accept.  socketmodule.c:makesockaddr complains that the
            # address family is unknown.  We don't want the whole server
            # to shut down because of this.
            if self.adj.log_socket_errors:
                self.log_info ('warning: server accept() threw an exception',
                               'warning')
            return
        self.channel_class(self, conn, addr, self.adj)

    #
    ############################################################



=== Added File Zope3/src/zope/server/serverchannelbase.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: serverchannelbase.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""

import os
import time
import sys
import asyncore
from thread import allocate_lock

# Enable ZOPE_SERVER_SIMULT_MODE to enable experimental
# simultaneous channel mode, which may improve or degrade
# throughput depending on load characteristics.
if os.environ.get('ZOPE_SERVER_SIMULT_MODE'):
    from zope.server.dualmodechannel import SimultaneousModeChannel as \
         ChannelBaseClass
else:
    from zope.server.dualmodechannel import DualModeChannel as ChannelBaseClass

from zope.server.interfaces.interfaces import IServerChannel

# Synchronize access to the "running_tasks" attributes.
running_lock = allocate_lock()


class ServerChannelBase(ChannelBaseClass, object):
    """Base class for a high-performance, mixed-mode server-side channel.
    """

    __implements__ = ChannelBaseClass.__implements__, IServerChannel


    parser_class = None       # Subclasses must provide a parser class
    task_class = None         # ... and a task class.

    active_channels = {}        # Class-specific channel tracker
    next_channel_cleanup = [0]  # Class-specific cleanup time

    proto_request = None      # A request parser instance
    ready_requests = None     # A list
    # ready_requests must always be empty when not running tasks.
    last_activity = 0         # Time of last activity
    running_tasks = 0         # boolean: true when any task is being executed

    #
    # ASYNCHRONOUS METHODS (incl. __init__)
    #

    def __init__(self, server, conn, addr, adj=None):
        ChannelBaseClass.__init__(self, conn, addr, adj)
        self.server = server
        self.last_activity = t = self.creation_time
        self.check_maintenance(t)


    def add_channel(self, map=None):
        """This hook keeps track of opened HTTP channels.
        """
        ChannelBaseClass.add_channel(self, map)
        self.__class__.active_channels[self._fileno] = self


    def del_channel(self, map=None):
        """This hook keeps track of closed HTTP channels.
        """
        ChannelBaseClass.del_channel(self, map)
        ac = self.__class__.active_channels
        fd = self._fileno
        if fd in ac:
            del ac[fd]


    def check_maintenance(self, now):
        """Performs maintenance if necessary.
        """
        ncc = self.__class__.next_channel_cleanup
        if now < ncc[0]:
            return
        ncc[0] = now + self.adj.cleanup_interval
        self.maintenance()


    def maintenance(self):
        """Kills off dead connections.
        """
        self.kill_zombies()


    def kill_zombies(self):
        """Closes connections that have not had any activity in a while.

        The timeout is configured through adj.channel_timeout (seconds).
        """
        now = time.time()
        cutoff = now - self.adj.channel_timeout
        for channel in self.active_channels.values():
            if (channel is not self and not channel.running_tasks and
                channel.last_activity < cutoff):
                channel.close()


    def received(self, data):
        """Receive input asynchronously and send requests to
        receivedCompleteRequest().
        """
        preq = self.proto_request
        while data:
            if preq is None:
                preq = self.parser_class(self.adj)
            n = preq.received(data)
            if preq.completed:
                # The request is ready to use.
                if not preq.empty:
                    self.receivedCompleteRequest(preq)
                preq = None
                self.proto_request = None
            else:
                self.proto_request = preq
            if n >= len(data):
                break
            data = data[n:]


    def receivedCompleteRequest(self, req):
        """If there are tasks running or requests on hold, queue
        the request, otherwise execute it.
        """
        do_now = 0
        running_lock.acquire()
        try:
            if self.running_tasks:
                # A task thread is working.  It will read from the queue
                # when it is finished.
                rr = self.ready_requests
                if rr is None:
                    rr = []
                    self.ready_requests = rr
                rr.append(req)
            else:
                # Do it now.
                do_now = 1
        finally:
            running_lock.release()
        if do_now:
            task = self.process_request(req)
            if task is not None:
                self.start_task(task)


    def start_task(self, task):
        """Starts the given task.

        *** For thread safety, this should only be called from the main
        (async) thread. ***"""
        if self.running_tasks:
            # Can't start while another task is running!
            # Otherwise two threads would work on the queue at the same time.
            raise RuntimeError, 'Already executing tasks'
        self.running_tasks = 1
        self.set_sync()
        self.server.addTask(task)


    def handle_error(self):
        """Handles program errors (not communication errors)
        """
        t, v = sys.exc_info()[:2]
        if t is SystemExit or t is KeyboardInterrupt:
            raise t, v
        asyncore.dispatcher.handle_error(self)


    def handle_comm_error(self):
        """Handles communication errors (not program errors)
        """
        if self.adj.log_socket_errors:
            self.handle_error()
        else:
            # Ignore socket errors.
            self.close()


    #
    # SYNCHRONOUS METHODS
    #

    def end_task(self, close):
        """Called at the end of a task and may launch another task.
        """
        if close:
            # Note that self.running_tasks is left on, which has the
            # side effect of preventing further requests from being
            # serviced even if more appear.  A good thing.
            self.close_when_done()
            return
        # Process requests held in the queue, if any.
        while 1:
            req = None
            running_lock.acquire()
            try:
                rr = self.ready_requests
                if rr:
                    req = rr.pop(0)
                else:
                    # No requests to process.
                    self.running_tasks = 0
            finally:
                running_lock.release()

            if req is not None:
                task = self.process_request(req)
                if task is not None:
                    # Add the new task.  It will service the queue.
                    self.server.addTask(task)
                    break
                # else check the queue again.
            else:
                # Idle -- Wait for another request on this connection.
                self.set_async()
                break


    #
    # BOTH MODES
    #

    def process_request(self, req):
        """Returns a task to execute or None if the request is quick and
        can be processed in the main thread.

        Override to handle some requests in the main thread.
        """
        return self.task_class(self, req)




=== Added File Zope3/src/zope/server/taskthreads.py ===
# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.


import sys
from Queue import Queue, Empty
from thread import allocate_lock, start_new_thread
from time import time, sleep
import logging

from zope.server.interfaces.interfaces import ITaskDispatcher



class ThreadedTaskDispatcher:

    __implements__ = ITaskDispatcher

    stop_count = 0  # Number of threads that will stop soon.

    def __init__(self):
        self.threads = {}  # { thread number -> 1 }
        self.queue = Queue()
        self.thread_mgmt_lock = allocate_lock()

    def handlerThread(self, thread_no):
        threads = self.threads
        try:
            while threads.get(thread_no):
                task = self.queue.get()
                if task is None:
                    # Special value: kill this thread.
                    break
                try:
                    task.service()
                except:
                    logging.exception('Exception during task')
        finally:
            mlock = self.thread_mgmt_lock
            mlock.acquire()
            try:
                self.stop_count -= 1
                try: del threads[thread_no]
                except KeyError: pass
            finally:
                mlock.release()

    def setThreadCount(self, count):
        mlock = self.thread_mgmt_lock
        mlock.acquire()
        try:
            threads = self.threads
            thread_no = 0
            running = len(threads) - self.stop_count
            while running < count:
                # Start threads.
                while thread_no in threads:
                    thread_no = thread_no + 1
                threads[thread_no] = 1
                running += 1
                start_new_thread(self.handlerThread, (thread_no,))
                thread_no = thread_no + 1
            if running > count:
                # Stop threads.
                to_stop = running - count
                self.stop_count += to_stop
                for n in range(to_stop):
                    self.queue.put(None)
                    running -= 1
        finally:
            mlock.release()

    def addTask(self, task):
        if task is None:
            raise ValueError, "No task passed to addTask()."
        # assert ITask.isImplementedBy(task)
        try:
            task.defer()
            self.queue.put(task)
        except:
            task.cancel()
            raise

    def shutdown(self, cancel_pending=1, timeout=5):
        self.setThreadCount(0)
        # Ensure the threads shut down.
        threads = self.threads
        expiration = time() + timeout
        while threads:
            if time() >= expiration:
                logging.error("%d thread(s) still running" % len(threads))
            sleep(0.1)
        if cancel_pending:
            # Cancel remaining tasks.
            try:
                queue = self.queue
                while not queue.empty():
                    task = queue.get()
                    if task is not None:
                        task.cancel()
            except Empty:
                pass

    def getPendingTasksEstimate(self):
        return self.queue.qsize()


=== Added File Zope3/src/zope/server/utilities.py ===
# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.


def find_double_newline(s):
    """Returns the position just after a double newline in the given string."""
    pos1 = s.find('\n\r\n')  # One kind of double newline
    if pos1 >= 0:
        pos1 += 3
    pos2 = s.find('\n\n')    # Another kind of double newline
    if pos2 >= 0:
        pos2 += 2

    if pos1 >= 0:
        if pos2 >= 0:
            return min(pos1, pos2)
        else:
            return pos1
    else:
        return pos2




=== Added File Zope3/src/zope/server/zlogintegration.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Make asyncore log to the logging module.

As a side effect of importing this module, asyncore's logging will be
redirected to the logging module.

$Id: zlogintegration.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""

import logging

logger = logging.getLogger("Zope.Server")

severity = {
    'info': logging.INFO,
    'warning': logging.WARN,
    'error': logging.ERROR,
    }

def log_info(self, message, type='info'):
    logger.log(severity.get(type, logging.INFO), message)

import asyncore
asyncore.dispatcher.log_info = log_info