[Zope3-checkins] CVS: Zope3/src/zope/server - selecttrigger.py:1.1.2.1 __init__.py:1.1.2.2 adjustments.py:1.1.2.2 fixedstreamreceiver.py:1.1.2.2 serverbase.py:1.1.2.2 serverchannelbase.py:1.1.2.2 taskthreads.py:1.1.2.2

Guido van Rossum guido@python.org
Mon, 23 Dec 2002 16:45:58 -0500


Update of /cvs-repository/Zope3/src/zope/server
In directory cvs.zope.org:/tmp/cvs-serv13315

Modified Files:
      Tag: NameGeddon-branch
	__init__.py adjustments.py fixedstreamreceiver.py 
	serverbase.py serverchannelbase.py taskthreads.py 
Added Files:
      Tag: NameGeddon-branch
	selecttrigger.py 
Log Message:
Move selecttrigger up; you can't have a subpackage named 'thread' and
also import the built-in thread module, so getting rid of the thread
subpackage (which only contains selecttrigger.py).

BTW, selecttrigger.py seems unused and untested here.

=== Added File Zope3/src/zope/server/selecttrigger.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
# -*- Mode: Python; tab-width: 4 -*-

VERSION_STRING = "$Id: selecttrigger.py,v 1.1.2.1 2002/12/23 21:45:27 gvanrossum Exp $"

import asyncore
import asynchat

import os
import socket
import string
import thread
import traceback

if os.name == 'posix':

    class Trigger(asyncore.file_dispatcher):

        "Wake up a call to select() running in the main thread"

        # This is useful in a context where you are using Medusa's I/O
        # subsystem to deliver data, but the data is generated by another
        # thread.  Normally, if Medusa is in the middle of a call to
        # select(), new output data generated by another thread will have
        # to sit until the call to select() either times out or returns.
        # If the trigger is 'pulled' by another thread, it should immediately
        # generate a READ event on the trigger object, which will force the
        # select() invocation to return.

        # A common use for this facility: letting Medusa manage I/O for a
        # large number of connections; but routing each request through a
        # thread chosen from a fixed-size thread pool.  When a thread is
        # acquired, a transaction is performed, but output data is
        # accumulated into buffers that will be emptied more efficiently
        # by Medusa. [picture a server that can process database queries
        # rapidly, but doesn't want to tie up threads waiting to send data
        # to low-bandwidth connections]

        # The other major feature provided by this class is the ability to
        # move work back into the main thread: if you call pull_trigger()
        # with a thunk argument, when select() wakes up and receives the
        # event it will call your thunk from within that thread.  The main
        # purpose of this is to remove the need to wrap thread locks around
        # Medusa's data structures, which normally do not need them.  [To see
        # why this is true, imagine this scenario: A thread tries to push some
        # new data onto a channel's outgoing data queue at the same time that
        # the main thread is trying to remove some]

        def __init__(self):
            r, w = self._fds = os.pipe()
            self.trigger = w
            asyncore.file_dispatcher.__init__(self, r)
            self.lock = thread.allocate_lock()
            self.thunks = []
            self._closed = 0

        # Override the asyncore close() method, because it seems that
        # it would only close the r file descriptor and not w.  The
        # constructor calls file_dispatcher.__init__ and passes r,
        # which would get stored in a file_wrapper and get closed by
        # the default close.  But that would leave w open...

        def close(self):
            if not self._closed:
                self._closed = 1
                self.del_channel()
                for fd in self._fds:
                    os.close(fd)
                self._fds = []

        def __repr__(self):
            return '<select-trigger (pipe) at %x>' % id(self)

        def readable(self):
            return 1

        def writable(self):
            return 0

        def handle_connect(self):
            pass

        def pull_trigger(self, thunk=None):
            if thunk:
                self.lock.acquire()
                try:
                    self.thunks.append(thunk)
                finally:
                    self.lock.release()
            os.write(self.trigger, 'x')

        def handle_read(self):
            try:
                self.recv(8192)
            except socket.error:
                return
            self.lock.acquire()
            try:
                for thunk in self.thunks:
                    try:
                        thunk()
                    except:
                        L = traceback.format_exception(*sys.exc_info())
                        print 'exception in trigger thunk:\n%s' % "".join(L)
                self.thunks = []
            finally:
                self.lock.release()

else:
    # XXX Should define a base class that has the common methods and
    # then put the platform-specific in a subclass named trigger.

    # win32-safe version

    HOST = '127.0.0.1'
    MINPORT = 19950
    NPORTS = 50

    class Trigger(asyncore.dispatcher):

        portoffset = 0

        def __init__(self):
            a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            # set TCP_NODELAY to true to avoid buffering
            w.setsockopt(socket.IPPROTO_TCP, 1, 1)

            # tricky: get a pair of connected sockets
            for i in range(NPORTS):
                Trigger.portoffset = (Trigger.portoffset + 1) % NPORTS
                port = MINPORT + Trigger.portoffset
                address = (HOST, port)
                try:
                    a.bind(address)
                except socket.error:
                    continue
                else:
                    break
            else:
                raise RuntimeError, 'Cannot bind trigger!'

            a.listen(1)
            w.setblocking(0)
            try:
                w.connect(address)
            except:
                pass
            r, addr = a.accept()
            a.close()
            w.setblocking(1)
            self.trigger = w

            asyncore.dispatcher.__init__(self, r)
            self.lock = thread.allocate_lock()
            self.thunks = []
            self._trigger_connected = 0

        def __repr__(self):
            return '<select-trigger (loopback) at %x>' % id(self)

        def readable(self):
            return 1

        def writable(self):
            return 0

        def handle_connect(self):
            pass

        def pull_trigger(self, thunk=None):
            if thunk:
                self.lock.acquire()
                try:
                    self.thunks.append(thunk)
                finally:
                    self.lock.release()
            self.trigger.send('x')

        def handle_read(self):
            try:
                self.recv(8192)
            except socket.error:
                return
            self.lock.acquire()
            try:
                for thunk in self.thunks:
                    try:
                        thunk()
                    except:
                        L = traceback.format_exception(*sys.exc_info())
                        print 'exception in trigger thunk:\n%s' % "".join(L)
                self.thunks = []
            finally:
                self.lock.release()


the_trigger = None

class TriggerFile:
    "A 'triggered' file object"

    buffer_size = 4096

    def __init__ (self, parent):
        global the_trigger
        if the_trigger is None:
            the_trigger = trigger()
        self.parent = parent
        self.buffer = ''

    def write (self, data):
        self.buffer = self.buffer + data
        if len(self.buffer) > self.buffer_size:
            d, self.buffer = self.buffer, ''
            the_trigger.pull_trigger(lambda: self.parent.push(d))

    def writeline (self, line):
        self.write(line + '\r\n')

    def writelines (self, lines):
        self.write("\r\n".join(lines) + "\r\n")

    def flush (self):
        if self.buffer:
            d, self.buffer = self.buffer, ''
            the_trigger.pull_trigger(lambda: self.parent.push(d))

    def softspace (self, *args):
        pass

    def close (self):
        # in a derived class, you may want to call trigger_close() instead.
        self.flush()
        self.parent = None

    def trigger_close (self):
        d, self.buffer = self.buffer, ''
        p, self.parent = self.parent, None
        the_trigger.pull_trigger(lambda: (p.push(d), p.close_when_done()))

if __name__ == '__main__':

    import time

    def thread_function (output_file, i, n):
        print 'entering thread_function'
        while n:
            time.sleep (5)
            output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
            output_file.flush()
            n = n - 1
        output_file.close()
        print 'exiting thread_function'

    class thread_parent (asynchat.async_chat):

        def __init__ (self, conn, addr):
            self.addr = addr
            asynchat.async_chat.__init__ (self, conn)
            self.set_terminator ('\r\n')
            self.buffer = ''
            self.count = 0

        def collect_incoming_data (self, data):
            self.buffer = self.buffer + data

        def found_terminator (self):
            data, self.buffer = self.buffer, ''
            if not data:
                asyncore.close_all()
                print "done"
                return
            n = string.atoi (string.split (data)[0])
            tf = TriggerFile (self)
            self.count = self.count + 1
            thread.start_new_thread (thread_function, (tf, self.count, n))

    class ThreadServer(asyncore.dispatcher):

        def __init__ (self, family=socket.AF_INET, address=('', 9003)):
            asyncore.dispatcher.__init__ (self)
            self.create_socket (family, socket.SOCK_STREAM)
            self.set_reuse_addr()
            self.bind (address)
            self.listen (5)

        def handle_accept (self):
            conn, addr = self.accept()
            tp = thread_parent (conn, addr)

    ThreadServer()
    #asyncore.loop(1.0, use_poll=1)
    try:
        asyncore.loop ()
    except:
        asyncore.close_all()


=== Zope3/src/zope/server/__init__.py 1.1.2.1 => 1.1.2.2 ===
--- Zope3/src/zope/server/__init__.py:1.1.2.1	Mon Dec 23 14:33:18 2002
+++ Zope3/src/zope/server/__init__.py	Mon Dec 23 16:45:27 2002
@@ -11,16 +11,14 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
-"""
-Zope.Server package.
+"""Zope.Server package.
 
 $Id$
 """
 
-from zope.server.interfaces.interfaces import IDispatcher
-from zope.interface.implements import implements
-
 import asyncore
 
-implements(asyncore.dispatcher, IDispatcher, 0)
+from zope.server.interfaces import IDispatcher
+from zope.interface.implements import implements
 
+implements(asyncore.dispatcher, IDispatcher, 0)


=== Zope3/src/zope/server/adjustments.py 1.1.2.1 => 1.1.2.2 ===
--- Zope3/src/zope/server/adjustments.py:1.1.2.1	Mon Dec 23 14:33:18 2002
+++ Zope3/src/zope/server/adjustments.py	Mon Dec 23 16:45:27 2002
@@ -1,4 +1,7 @@
-# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
@@ -6,8 +9,14 @@
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Adjustments are tunable parameters.
+
+$Id$
+"""
 
-import zope.server.maxsockets
+from zope.server import maxsockets
 
 
 class Adjustments:
@@ -42,7 +51,7 @@
     inbuf_overflow = 525000
 
     # Stop accepting new connections if too many are already active.
-    connection_limit = MaxSockets.max_select_sockets() - 3  # Safe
+    connection_limit = maxsockets.max_select_sockets() - 3  # Safe
 
     # Minimum seconds between cleaning up inactive channels.
     cleanup_interval = 300


=== Zope3/src/zope/server/fixedstreamreceiver.py 1.1.2.1 => 1.1.2.2 ===
--- Zope3/src/zope/server/fixedstreamreceiver.py:1.1.2.1	Mon Dec 23 14:33:18 2002
+++ Zope3/src/zope/server/fixedstreamreceiver.py	Mon Dec 23 16:45:27 2002
@@ -16,7 +16,7 @@
 $Id$
 """
 
-from zope.server.interfaces.interfaces import IStreamConsumer
+from zope.server.interfaces import IStreamConsumer
 
 
 class FixedStreamReceiver:


=== Zope3/src/zope/server/serverbase.py 1.1.2.1 => 1.1.2.2 ===
--- Zope3/src/zope/server/serverbase.py:1.1.2.1	Mon Dec 23 14:33:18 2002
+++ Zope3/src/zope/server/serverbase.py	Mon Dec 23 16:45:27 2002
@@ -22,7 +22,7 @@
 
 from zope.server.adjustments import default_adj
 
-from zope.server.interfaces.interfaces import IServer
+from zope.server.interfaces import IServer
 
 
 class ServerBase(asyncore.dispatcher, object):


=== Zope3/src/zope/server/serverchannelbase.py 1.1.2.1 => 1.1.2.2 ===
--- Zope3/src/zope/server/serverchannelbase.py:1.1.2.1	Mon Dec 23 14:33:18 2002
+++ Zope3/src/zope/server/serverchannelbase.py	Mon Dec 23 16:45:27 2002
@@ -31,7 +31,7 @@
 else:
     from zope.server.dualmodechannel import DualModeChannel as ChannelBaseClass
 
-from zope.server.interfaces.interfaces import IServerChannel
+from zope.server.interfaces import IServerChannel
 
 # Synchronize access to the "running_tasks" attributes.
 running_lock = allocate_lock()


=== Zope3/src/zope/server/taskthreads.py 1.1.2.1 => 1.1.2.2 ===
--- Zope3/src/zope/server/taskthreads.py:1.1.2.1	Mon Dec 23 14:33:18 2002
+++ Zope3/src/zope/server/taskthreads.py	Mon Dec 23 16:45:27 2002
@@ -14,8 +14,7 @@
 from time import time, sleep
 import logging
 
-from zope.server.interfaces.interfaces import ITaskDispatcher
-
+from zope.server.interfaces import ITaskDispatcher
 
 
 class ThreadedTaskDispatcher: