[Zope3-checkins] CVS: Zope3/src/zodb/zeo/zrpc - __init__.py:1.1.2.1 client.py:1.1.2.1 connection.py:1.1.2.1 error.py:1.1.2.1 log.py:1.1.2.1 marshal.py:1.1.2.1 server.py:1.1.2.1 smac.py:1.1.2.1 trigger.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:30:56 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo/zrpc
In directory cvs.zope.org:/tmp/cvs-serv19908/zodb/zeo/zrpc

Added Files:
      Tag: NameGeddon-branch
	__init__.py client.py connection.py error.py log.py marshal.py 
	server.py smac.py trigger.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zodb/zeo/zrpc/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# zrpc is a package with the following modules
# client -- manages connection creation to remote server
# connection -- object dispatcher
# log -- logging helper
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
# server -- manages incoming connections from remote clients
# smac -- sized message async connections
# trigger -- medusa's trigger

# zrpc is not an advertised subpackage of ZEO; its interfaces are internal


=== Added File Zope3/src/zodb/zeo/zrpc/client.py === (421/521 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import errno
import select
import socket
import sys
import threading
import time
import types

import ThreadedAsync

from zodb.interfaces import ReadOnlyError

from zodb.zeo.zrpc import log
from zodb.zeo.zrpc.trigger import trigger
from zodb.zeo.zrpc.connection import ManagedConnection

class ConnectionManager:
    """Keeps a connection up over time"""

    def __init__(self, addrs, client, tmin=1, tmax=180):
        self.addrlist = self._parse_addrs(addrs)
        self.client = client
        self.tmin = tmin
        self.tmax = tmax
        self.cond = threading.Condition(threading.Lock())
        self.connection = None # Protected by self.cond
        self.closed = 0
        # If thread is not None, then there is a helper thread
        # attempting to connect.
        self.thread = None # Protected by self.cond
        self.trigger = None
        self.thr_async = 0
        ThreadedAsync.register_loop_callback(self.set_async)

    def __repr__(self):
        return "<%s for %s>" % (self.__class__.__name__, self.addrlist)


[-=- -=- -=- 421 lines omitted -=- -=- -=-]

            self.state = "tested"
        except ReadOnlyError:
            log.info("CW: ReadOnlyError in testConnection (%s)",
                      repr(self.addr))
            self.close()
            return
        except:
            log.error("CW: error in testConnection (%s)", repr(self.addr),
                      exc_info=True)
            self.close()
            return
        if self.preferred:
            self.notify_client()

    def notify_client(self):
        """Call the client's notifyConnected().

        If this succeeds, call the manager's connect_done().

        If the client is already connected, we assume it's a fallback
        connection, and the new connection must be a preferred
        connection.  The client will close the old connection.
        """
        try:
            self.client.notifyConnected(self.conn)
        except:
            log.error("CW: error in notifyConnected (%s)", repr(self.addr),
                      exc_info=True)
            self.close()
            return
        self.state = "notified"
        self.mgr.connect_done(self.conn, self.preferred)

    def close(self):
        """Close the socket and reset everything."""
        self.state = "closed"
        self.mgr = self.client = None
        self.preferred = 0
        if self.conn is not None:
            # Closing the ZRPC connection will eventually close the
            # socket, somewhere in asyncore.
            # XXX Why do we care? --Guido
            self.conn.close()
            self.conn = None
        if self.sock is not None:
            self.sock.close()
            self.sock = None

    def fileno(self):
        return self.sock.fileno()


=== Added File Zope3/src/zodb/zeo/zrpc/connection.py === (407/507 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import errno
import select
import sys
import threading
import types
import logging

import ThreadedAsync
from zodb.zeo.zrpc import smac
from zodb.zeo.zrpc.error import ZRPCError, DisconnectedError
from zodb.zeo.zrpc import log
from zodb.zeo.zrpc.marshal import Marshaller
from zodb.zeo.zrpc.trigger import trigger
from zodb import POSException

REPLY = ".reply" # message name used for replies
ASYNC = 1

class Delay:
    """Used to delay response to client for synchronous calls

    When a synchronous call is made and the original handler returns
    without handling the call, it returns a Delay object that prevents
    the mainloop from sending a response.
    """

    def set_sender(self, msgid, send_reply, return_error):
        self.msgid = msgid
        self.send_reply = send_reply
        self.return_error = return_error

    def reply(self, obj):
        self.send_reply(self.msgid, obj)

    def error(self, exc_info):
        log.error("Error raised in delayed method", exc_info=True)

[-=- -=- -=- 407 lines omitted -=- -=- -=-]

        self.mgr = mgr
        self.__super_init(sock, addr, obj)
        self.obj.notifyConnected(self)

    def close(self):
        self.obj.notifyDisconnected()
        self.mgr.close_conn(self)
        self.__super_close()

class ManagedConnection(Connection):
    """Client-side Connection subclass."""
    __super_init = Connection.__init__
    __super_close = Connection.close

    def __init__(self, sock, addr, obj, mgr):
        self.mgr = mgr
        self.__super_init(sock, addr, obj)
        self.check_mgr_async()

    # Defer the ThreadedAsync work to the manager.

    def close_trigger(self):
        # the manager should actually close the trigger
        del self.trigger

    def set_async(self, map):
        pass

    def _prepare_async(self):
        # Don't do the register_loop_callback that the superclass does
        pass

    def check_mgr_async(self):
        if not self.thr_async and self.mgr.thr_async:
            assert self.mgr.trigger is not None, \
                   "manager (%s) has no trigger" % self.mgr
            self.thr_async = 1
            self.trigger = self.mgr.trigger
            return 1
        return 0

    def is_async(self):
        # XXX could the check_mgr_async() be avoided on each test?
        if self.thr_async:
            return 1
        return self.check_mgr_async()

    def close(self):
        self.mgr.close_conn(self)
        self.__super_close()


=== Added File Zope3/src/zodb/zeo/zrpc/error.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from zodb import POSException
from zodb.zeo.exceptions import Disconnected

class ZRPCError(POSException.StorageError):
    pass

class DisconnectedError(ZRPCError, Disconnected):
    """The database storage is disconnected from the storage server."""


=== Added File Zope3/src/zodb/zeo/zrpc/log.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import types
import threading
import logging

LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging

_label = "zrpc:%s" % os.getpid()

# The code duplication here is for speed (save a layer of function call).

def critical(msg, *args, **kw):
    label = _label
    if LOG_THREAD_ID:
        label = "%s:%s" % (label, threading.currentThread().getName())
    logging.critical("%s: "+msg, label, *args, **kw)

def error(msg, *args, **kw):
    label = _label
    if LOG_THREAD_ID:
        label = "%s:%s" % (label, threading.currentThread().getName())
    logging.error("%s: "+msg, label, *args, **kw)

def warn(msg, *args, **kw):
    label = _label
    if LOG_THREAD_ID:
        label = "%s:%s" % (label, threading.currentThread().getName())
    logging.warn("%s: "+msg, label, *args, **kw)

def info(msg, *args, **kw):
    label = _label
    if LOG_THREAD_ID:
        label = "%s:%s" % (label, threading.currentThread().getName())
    logging.info("%s: "+msg, label, *args, **kw)

def debug(msg, *args, **kw):
    label = _label
    if LOG_THREAD_ID:
        label = "%s:%s" % (label, threading.currentThread().getName())
    logging.debug("%s: "+msg, label, *args, **kw)

REPR_LIMIT = 40

def short_repr(obj):
    "Return an object repr limited to REPR_LIMIT bytes."

    # Some of the objects being repr'd are large strings.  It's wastes
    # a lot of memory to repr them and then truncate, so special case
    # them in this function.
    # Also handle short repr of a tuple containing a long string.

    # This strategy works well for arguments to StorageServer methods.
    # The oid is usually first and will get included in its entirety.
    # The pickle is near the beginning, too, and you can often fit the
    # module name in the pickle.

    if isinstance(obj, types.StringType):
        if len(obj) > REPR_LIMIT:
            r = repr(obj[:REPR_LIMIT])
        else:
            r = repr(obj)
        if len(r) > REPR_LIMIT:
            r = r[:REPR_LIMIT-4] + '...' + r[-1]
        return r
    elif isinstance(obj, types.TupleType):
        elts = []
        size = 0
        for elt in obj:
            r = repr(elt)
            elts.append(r)
            size += len(r)
            if size > REPR_LIMIT:
                break
        r = "(%s)" % (", ".join(elts))
    else:
        r = repr(obj)
    if len(r) > REPR_LIMIT:
        return r[:REPR_LIMIT] + '...'
    else:
        return r


=== Added File Zope3/src/zodb/zeo/zrpc/marshal.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import cPickle
from cStringIO import StringIO
import types

from zodb.zeo.zrpc.error import ZRPCError
from zodb.zeo.zrpc import log

class Marshaller:
    """Marshal requests and replies to second across network"""

    def encode(self, msgid, flags, name, args):
        """Returns an encoded message"""
        # (We used to have a global pickler, but that's not thread-safe. :-( )
        pickler = cPickle.Pickler()
        pickler.fast = 1
        return pickler.dump((msgid, flags, name, args), 1)

    def decode(self, msg):
        """Decodes msg and returns its parts"""
        unpickler = cPickle.Unpickler(StringIO(msg))
        unpickler.find_global = find_global

        try:
            return unpickler.load() # msgid, flags, name, args
        except:
            log.error("can't decode message: %s", log.short_repr(msg))
            raise

_globals = globals()
_silly = ('__doc__',)

def find_global(module, name):
    """Helper for message unpickler"""
    try:
        m = __import__(module, _globals, _globals, _silly)
    except ImportError, msg:
        raise ZRPCError("import error %s: %s" % (module, msg))

    try:
        r = getattr(m, name)
    except AttributeError:
        raise ZRPCError("module %s has no global %s" % (module, name))

    safe = getattr(r, '__no_side_effects__', 0)
    if safe:
        return r

    # XXX what's a better way to do this?  esp w/ 2.1 & 2.2
    if type(r) == types.ClassType and issubclass(r, Exception):
        return r

    raise ZRPCError("Unsafe global: %s.%s" % (module, name))


=== Added File Zope3/src/zodb/zeo/zrpc/server.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import socket
import types

from zodb.zeo.zrpc.connection import Connection, Delay
from zodb.zeo.zrpc import log
import ThreadedAsync

# Export the main asyncore loop
loop = ThreadedAsync.loop

class Dispatcher(asyncore.dispatcher):
    """A server that accepts incoming RPC connections"""
    __super_init = asyncore.dispatcher.__init__

    reuse_addr = 1

    def __init__(self, addr, factory=Connection, reuse_addr=None):
        self.__super_init()
        self.addr = addr
        self.factory = factory
        self.clients = []
        if reuse_addr is not None:
            self.reuse_addr = reuse_addr
        self._open_socket()

    def _open_socket(self):
        if type(self.addr) == types.TupleType:
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.set_reuse_addr()
        log.info("listening on %s", str(self.addr))
        self.bind(self.addr)
        self.listen(5)

    def writable(self):
        return 0

    def readable(self):
        return 1

    def handle_accept(self):
        try:
            sock, addr = self.accept()
        except socket.error, msg:
            log.info("accepted failed: %s", msg)
            return
        c = self.factory(sock, addr)
        log.info("connect from %s: %s", repr(addr), c)
        self.clients.append(c)


=== Added File Zope3/src/zodb/zeo/zrpc/smac.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Sized Message Async Connections."""

import asyncore, struct
import threading
import socket, errno
from types import StringType

from zodb.zeo.exceptions import Disconnected
from zodb.zeo.zrpc import log

# Use the dictionary to make sure we get the minimum number of errno
# entries.   We expect that EWOULDBLOCK == EAGAIN on most systems --
# or that only one is actually used.

tmp_dict = {errno.EWOULDBLOCK: 0,
            errno.EAGAIN: 0,
            errno.EINTR: 0,
            }
expected_socket_read_errors = tuple(tmp_dict.keys())

tmp_dict = {errno.EAGAIN: 0,
            errno.EWOULDBLOCK: 0,
            errno.ENOBUFS: 0,
            errno.EINTR: 0,
            }
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict

# We chose 60000 as the socket limit by looking at the largest strings
# that we could pass to send() without blocking.
SEND_SIZE = 60000

class SizedMessageAsyncConnection(asyncore.dispatcher):
    __super_init = asyncore.dispatcher.__init__
    __super_close = asyncore.dispatcher.close

    __closed = 1 # Marker indicating that we're closed

    socket = None # to outwit Sam's getattr

    def __init__(self, sock, addr, map=None, debug=None):
        self.addr = addr
        if debug is not None:
            self._debug = debug
        elif not hasattr(self, '_debug'):
            self._debug = __debug__
        # __input_lock protects __inp, __input_len, __state, __msg_size
        self.__input_lock = threading.Lock()
        self.__inp = None # None, a single String, or a list
        self.__input_len = 0
        # Instance variables __state and __msg_size work together:
        #   when __state == 0:
        #     __msg_size == 4, and the next thing read is a message size;
        #   when __state == 1:
        #     __msg_size is variable, and the next thing read is a message.
        # The next thing read is always of length __msg_size.
        # The state alternates between 0 and 1.
        self.__state = 0
        self.__msg_size = 4
        self.__output_lock = threading.Lock() # Protects __output
        self.__output = []
        self.__closed = 0
        self.__super_init(sock, map)

    def get_addr(self):
        return self.addr

    # XXX avoid expensive getattr calls?  Can't remember exactly what
    # this comment was supposed to mean, but it has something to do
    # with the way asyncore uses getattr and uses if sock:
    def __nonzero__(self):
        return 1

    def handle_read(self):
        self.__input_lock.acquire()
        try:
            # Use a single __inp buffer and integer indexes to make this fast.
            try:
                d = self.recv(8192)
            except socket.error, err:
                if err[0] in expected_socket_read_errors:
                    return
                raise
            if not d:
                return

            input_len = self.__input_len + len(d)
            msg_size = self.__msg_size
            state = self.__state

            inp = self.__inp
            if msg_size > input_len:
                if inp is None:
                    self.__inp = d
                elif type(self.__inp) is StringType:
                    self.__inp = [self.__inp, d]
                else:
                    self.__inp.append(d)
                self.__input_len = input_len
                return # keep waiting for more input

            # load all previous input and d into single string inp
            if isinstance(inp, StringType):
                inp = inp + d
            elif inp is None:
                inp = d
            else:
                inp.append(d)
                inp = "".join(inp)

            offset = 0
            while (offset + msg_size) <= input_len:
                msg = inp[offset:offset + msg_size]
                offset = offset + msg_size
                if not state:
                    # waiting for message
                    msg_size = struct.unpack(">i", msg)[0]
                    state = 1
                else:
                    msg_size = 4
                    state = 0
                    # XXX We call message_input() with __input_lock
                    # held!!!  And message_input() may end up calling
                    # message_output(), which has its own lock.  But
                    # message_output() cannot call message_input(), so
                    # the locking order is always consistent, which
                    # prevents deadlock.  Also, message_input() may
                    # take a long time, because it can cause an
                    # incoming call to be handled.  During all this
                    # time, the __input_lock is held.  That's a good
                    # thing, because it serializes incoming calls.
                    self.message_input(msg)

            self.__state = state
            self.__msg_size = msg_size
            self.__inp = inp[offset:]
            self.__input_len = input_len - offset
        finally:
            self.__input_lock.release()

    def readable(self):
        return 1

    def writable(self):
        if len(self.__output) == 0:
            return 0
        else:
            return 1

    def handle_write(self):
        self.__output_lock.acquire()
        try:
            output = self.__output
            while output:
                # Accumulate output into a single string so that we avoid
                # multiple send() calls, but avoid accumulating too much
                # data.  If we send a very small string and have more data
                # to send, we will likely incur delays caused by the
                # unfortunate interaction between the Nagle algorithm and
                # delayed acks.  If we send a very large string, only a
                # portion of it will actually be delivered at a time.

                l = 0
                for i in range(len(output)):
                    l += len(output[i])
                    if l > SEND_SIZE:
                        break

                i += 1
                # It is very unlikely that i will be 1.
                v = "".join(output[:i])
                del output[:i]

                try:
                    n = self.send(v)
                except socket.error, err:
                    if err[0] in expected_socket_write_errors:
                        break # we couldn't write anything
                    raise
                if n < len(v):
                    output.insert(0, v[n:])
                    break # we can't write any more
        finally:
            self.__output_lock.release()

    def handle_close(self):
        self.close()

    def message_output(self, message):
        if __debug__:
            if self._debug:
                log.debug('message_output %d bytes: %s',
                          len(message), log.short_repr(message))

        if self.__closed:
            raise Disconnected("Action is temporarily unavailable")
        self.__output_lock.acquire()
        try:
            # do two separate appends to avoid copying the message string
            self.__output.append(struct.pack(">i", len(message)))
            if len(message) <= SEND_SIZE:
                self.__output.append(message)
            else:
                for i in range(0, len(message), SEND_SIZE):
                    self.__output.append(message[i:i+SEND_SIZE])
        finally:
            self.__output_lock.release()

    def close(self):
        if not self.__closed:
            self.__closed = 1
            self.__super_close()


=== Added File Zope3/src/zodb/zeo/zrpc/trigger.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

import asyncore
import os
import socket
import thread
import traceback

if os.name == 'posix':

    class trigger(asyncore.file_dispatcher):

        "Wake up a call to select() running in the main thread"

        # This is useful in a context where you are using Medusa's I/O
        # subsystem to deliver data, but the data is generated by another
        # thread.  Normally, if Medusa is in the middle of a call to
        # select(), new output data generated by another thread will have
        # to sit until the call to select() either times out or returns.
        # If the trigger is 'pulled' by another thread, it should immediately
        # generate a READ event on the trigger object, which will force the
        # select() invocation to return.

        # A common use for this facility: letting Medusa manage I/O for a
        # large number of connections; but routing each request through a
        # thread chosen from a fixed-size thread pool.  When a thread is
        # acquired, a transaction is performed, but output data is
        # accumulated into buffers that will be emptied more efficiently
        # by Medusa. [picture a server that can process database queries
        # rapidly, but doesn't want to tie up threads waiting to send data
        # to low-bandwidth connections]

        # The other major feature provided by this class is the ability to
        # move work back into the main thread: if you call pull_trigger()
        # with a thunk argument, when select() wakes up and receives the
        # event it will call your thunk from within that thread.  The main
        # purpose of this is to remove the need to wrap thread locks around
        # Medusa's data structures, which normally do not need them.  [To see
        # why this is true, imagine this scenario: A thread tries to push some
        # new data onto a channel's outgoing data queue at the same time that
        # the main thread is trying to remove some]

        def __init__(self):
            r, w = self._fds = os.pipe()
            self.trigger = w
            asyncore.file_dispatcher.__init__(self, r)
            self.lock = thread.allocate_lock()
            self.thunks = []
            self._closed = 0

        # Override the asyncore close() method, because it seems that
        # it would only close the r file descriptor and not w.  The
        # constructor calls file_dispatcher.__init__ and passes r,
        # which would get stored in a file_wrapper and get closed by
        # the default close.  But that would leave w open...

        def close(self):
            if not self._closed:
                self._closed = 1
                self.del_channel()
                for fd in self._fds:
                    os.close(fd)
                self._fds = []

        def __repr__(self):
            return '<select-trigger (pipe) at %x>' % id(self)

        def readable(self):
            return 1

        def writable(self):
            return 0

        def handle_connect(self):
            pass

        def handle_close(self):
            self.close()

        def pull_trigger(self, thunk=None):
            if thunk:
                self.lock.acquire()
                try:
                    self.thunks.append(thunk)
                finally:
                    self.lock.release()
            os.write(self.trigger, 'x')

        def handle_read(self):
            try:
                self.recv(8192)
            except socket.error:
                return
            self.lock.acquire()
            try:
                for thunk in self.thunks:
                    try:
                        thunk()
                    except:
                        L = traceback.format_exception(*sys.exc_info())
                        print 'exception in trigger thunk:\n%s' % "".join(L)
                self.thunks = []
            finally:
                self.lock.release()

else:

    # XXX Should define a base class that has the common methods and
    # then put the platform-specific in a subclass named trigger.

    # win32-safe version

    HOST = '127.0.0.1'
    MINPORT = 19950
    NPORTS = 50

    class trigger(asyncore.dispatcher):

        portoffset = 0

        def __init__(self):
            a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            # set TCP_NODELAY to true to avoid buffering
            w.setsockopt(socket.IPPROTO_TCP, 1, 1)

            # tricky: get a pair of connected sockets
            for i in range(NPORTS):
                trigger.portoffset = (trigger.portoffset + 1) % NPORTS
                port = MINPORT + trigger.portoffset
                address = (HOST, port)
                try:
                    a.bind(address)
                except socket.error:
                    continue
                else:
                    break
            else:
                raise RuntimeError, 'Cannot bind trigger!'

            a.listen(1)
            w.setblocking(0)
            try:
                w.connect(address)
            except:
                pass
            r, addr = a.accept()
            a.close()
            w.setblocking(1)
            self.trigger = w

            asyncore.dispatcher.__init__(self, r)
            self.lock = thread.allocate_lock()
            self.thunks = []
            self._trigger_connected = 0

        def __repr__(self):
            return '<select-trigger (loopback) at %x>' % id(self)

        def readable(self):
            return 1

        def writable(self):
            return 0

        def handle_connect(self):
            pass

        def pull_trigger(self, thunk=None):
            if thunk:
                self.lock.acquire()
                try:
                    self.thunks.append(thunk)
                finally:
                    self.lock.release()
            self.trigger.send('x')

        def handle_read(self):
            try:
                self.recv(8192)
            except socket.error:
                return
            self.lock.acquire()
            try:
                for thunk in self.thunks:
                    try:
                        thunk()
                    except:
                        L = traceback.format_exception(*sys.exc_info())
                        print 'exception in trigger thunk:\n%s' % "".join(L)
                self.thunks = []
            finally:
                self.lock.release()