[Checkins] SVN: zc.ngi/trunk/ Added support for running multiple ``async`` implementations in
Jim Fulton
jim at zope.com
Mon Jun 21 07:08:59 EDT 2010
Log message for revision 113724:
Added support for running multiple ``async`` implementations in
separate threads. This is useful in applications with fewer network
connections and with handlers that tend to perform long-lating
computations that would be unacceptable with a single select loop.
Changed:
U zc.ngi/trunk/README.txt
U zc.ngi/trunk/src/zc/ngi/async.py
U zc.ngi/trunk/src/zc/ngi/tests.py
-=-
Modified: zc.ngi/trunk/README.txt
===================================================================
--- zc.ngi/trunk/README.txt 2010-06-21 10:37:59 UTC (rev 113723)
+++ zc.ngi/trunk/README.txt 2010-06-21 11:08:59 UTC (rev 113724)
@@ -36,6 +36,11 @@
``zc.ngi.blocking.request``. Other older blocking APIs are
deprecated.
+- Added support for running multiple ``async`` implementations in
+ separate threads. This is useful in applications with fewer network
+ connections and with handlers that tend to perform long-lating
+ computations that would be unacceptable with a single select loop.
+
- Dropped support for Python 2.4.
Bugs Fixed:
Modified: zc.ngi/trunk/src/zc/ngi/async.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/async.py 2010-06-21 10:37:59 UTC (rev 113723)
+++ zc.ngi/trunk/src/zc/ngi/async.py 2010-06-21 11:08:59 UTC (rev 113724)
@@ -12,8 +12,6 @@
#
##############################################################################
"""Asyncore-based implementation of the NGI
-
-$Id$
"""
import asyncore
@@ -29,9 +27,6 @@
pid = os.getpid()
is_win32 = sys.platform == 'win32'
-_map = {}
-_connectors = {}
-
expected_socket_read_errors = {
errno.EWOULDBLOCK: 0,
errno.EAGAIN: 0,
@@ -47,11 +42,83 @@
BUFFER_SIZE = 8*1024
+class Implementation:
+
+ def __init__(self):
+ self._map = {}
+ self._trigger = _Trigger(self._map)
+ self.notify_select = self._trigger.pull_trigger
+ self._start_lock = threading.Lock()
+
+ def call_from_thread(self, func):
+ self._trigger.callbacks.append(func)
+ self.notify_select()
+
+ def connect(self, addr, handler):
+ self.start_thread()
+ self.call_from_thread(lambda : _Connector(addr, handler, self))
+
+ def listener(self, addr, handler):
+ self.start_thread()
+ return _Listener(addr, handler, self)
+
+ def udp(self, address, message):
+ if isinstance(address, str):
+ family = socket.AF_UNIX
+ else:
+ family = socket.AF_INET
+ try:
+ sock = _udp_socks[family].pop()
+ except IndexError:
+ sock = socket.socket(family, socket.SOCK_DGRAM)
+
+ sock.sendto(message, address)
+ _udp_socks[family].append(sock)
+
+ def udp_listener(self, addr, handler, buffer_size=4096):
+ self.start_thread()
+ return _UDPListener(addr, handler, buffer_size, self)
+
+ _thread = None
+ def start_thread(self, daemon=True, name=__name__):
+ self._start_lock.acquire()
+ try:
+ if self._thread is None:
+ self._thread = threading.Thread(target=self._loop, name=name)
+ self._thread.setDaemon(True)
+ self._thread.start()
+ finally:
+ self._start_lock.release()
+
+ def _loop(self):
+ timeout = 30.0
+ map = self._map
+ logger = logging.getLogger('zc.ngi.async.loop')
+
+ while map:
+ try:
+ asyncore.poll(timeout, map)
+ except:
+ traceback.print_exception(*sys.exc_info())
+ #logger.exception('loop error')
+ raise
+
+ def cleanup_map(self):
+ for c in self._map.values():
+ if isinstance(c, _Trigger):
+ continue
+ try:
+ del self._map[c.fileno()]
+ except KeyError:
+ pass
+ c.close()
+
class dispatcher(asyncore.dispatcher):
- def __init__(self, sock, addr, map=_map):
+ def __init__(self, sock, addr, implementation):
self.addr = addr
- asyncore.dispatcher.__init__(self, sock, map)
+ self.implementation = implementation
+ asyncore.dispatcher.__init__(self, sock, implementation._map)
def handle_error(self):
reason = sys.exc_info()[1]
@@ -65,8 +132,8 @@
self.close()
def close(self):
- self.del_channel(_map)
- self.socket.close()
+ self.del_channel(self._map)
+ self.implementation.call_from_thread(self.socket.close)
def writable(self):
return False
@@ -75,13 +142,13 @@
control = None
- def __init__(self, sock, addr, logger):
+ def __init__(self, sock, addr, logger, implementation):
self.__connected = True
self.__closed = None
self.__handler = None
self.__iterator_exception = None
self.__output = []
- dispatcher.__init__(self, sock, addr)
+ dispatcher.__init__(self, sock, addr, implementation)
self.logger = logger
def __nonzero__(self):
@@ -114,14 +181,14 @@
self.logger.debug('write %r', data)
assert isinstance(data, str) or (data is zc.ngi.END_OF_DATA)
self.__output.append(data)
- notify_select()
+ self.implementation.notify_select()
def writelines(self, data):
if __debug__:
self.logger.debug('writelines %r', data)
assert not isinstance(data, str), "writelines does not accept strings"
self.__output.append(iter(data))
- notify_select()
+ self.implementation.notify_select()
def close(self):
self.__connected = False
@@ -129,7 +196,7 @@
dispatcher.close(self)
if self.control is not None:
self.control.closed(self)
- notify_select()
+ self.implementation.notify_select()
def readable(self):
return self.__handler is not None
@@ -231,9 +298,8 @@
def handle_expt(self):
self.handle_close('socket error')
+class _Connector(dispatcher):
-class connector(dispatcher):
-
logger = logging.getLogger('zc.ngi.async.client')
# When trying to do a connect on a non-blocking socket, some outcomes
@@ -252,24 +318,20 @@
_CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
_CONNECT_OK = (0, errno.EISCONN)
- def __init__(self, addr, handler):
- if not _thread:
- start_thread()
+ def __init__(self, addr, handler, implementation):
self.__handler = handler
if isinstance(addr, str):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- dispatcher.__init__(self, sock, addr, _connectors)
+ dispatcher.__init__(self, sock, addr, implementation)
- notify_select()
-
- def connect(self):
if __debug__:
self.logger.debug('connecting to %s', self.addr)
- self.add_channel(_map)
+ # INVARIANT: we are called from the select thread!
+
try:
self.handle_write_event()
except:
@@ -301,11 +363,12 @@
self.handle_close(reason)
return
- self.del_channel(_map)
+ self.del_channel(self._map)
if __debug__:
self.logger.debug('outgoing connected %r', self.addr)
- connection = _Connection(self.socket, self.addr, self.logger)
+ connection = _Connection(self.socket, self.addr, self.logger,
+ self.implementation)
try:
self.__handler.connected(connection)
except:
@@ -327,9 +390,6 @@
def handle_expt(self):
self.handle_close('connection failed')
-def connect(*args):
- connector(*args)
-
class BaseListener(asyncore.dispatcher):
def writable(self):
@@ -339,25 +399,26 @@
# work around file-dispatcher bug
if map is None:
return
- assert (map is _map)
- asyncore.dispatcher.add_channel(self, _map)
+ assert (map is self._map)
+ asyncore.dispatcher.add_channel(self, self._map)
def handle_error(self):
reason = sys.exc_info()[1]
- self.logger.exception('listener error')
+ #self.logger.exception('listener error')
+ traceback.print_exception(*sys.exc_info())
self.close()
-class listener(BaseListener):
+class _Listener(BaseListener):
logger = logging.getLogger('zc.ngi.async.server')
- def __init__(self, addr, handler):
- if not _thread:
- start_thread()
+ def __init__(self, addr, handler, implementation):
+ self.implementation = implementation
+ map = implementation._map
self.__handler = handler
self.__close_handler = None
self.__connections = {}
- asyncore.dispatcher.__init__(self)
+ asyncore.dispatcher.__init__(self, map=map)
if isinstance(addr, str):
family = socket.AF_UNIX
else:
@@ -391,9 +452,10 @@
self.close()
self.logger.warn("unable to listen on %r", addr)
raise
- self.add_channel(_map)
+
+ self.add_channel(map)
self.address = addr
- notify_select()
+ self.implementation.notify_select()
def handle_accept(self):
if not self.accepting:
@@ -407,12 +469,13 @@
# didn't get anything. Hm. Ignore.
return
except socket.error, msg:
- self.logger.exception("accepted failed: %s", msg)
+ traceback.print_exception(*sys.exc_info())
+ #self.logger.exception("accepted failed: %s", msg)
return
if __debug__:
self.logger.debug('incoming connection %r', addr)
- connection = _Connection(sock, addr, self.logger)
+ connection = _Connection(sock, addr, self.logger, self.implementation)
self.__connections[connection] = 1
connection.control = self
try:
@@ -432,8 +495,8 @@
def close(self, handler=None):
self.accepting = False
- self.del_channel(_map)
- call_from_thread(self.socket.close)
+ self.del_channel(self._map)
+ self.implementation.call_from_thread(self.socket.close)
if handler is None:
for c in list(self.__connections):
c.handle_close("stopped")
@@ -442,21 +505,21 @@
else:
self.__close_handler = handler
- # convenience method made possible by storaing out address:
+ # convenience method made possible by storing our address:
def connect(self, handler):
- connect(self.address, handler)
+ self.implementation.connect(self.address, handler)
-class udp_listener(BaseListener):
+class _UDPListener(BaseListener):
logger = logging.getLogger('zc.ngi.async.udpserver')
connected = True
- def __init__(self, addr, handler, buffer_size=4096):
- if not _thread:
- start_thread()
+ def __init__(self, addr, handler, buffer_size, implementation):
+ self.implementation = implementation
+ map = implementation._map
self.__handler = handler
self.__buffer_size = buffer_size
- asyncore.dispatcher.__init__(self)
+ asyncore.dispatcher.__init__(self, map=map)
if isinstance(addr, str):
family = socket.AF_UNIX
else:
@@ -471,33 +534,22 @@
self.close()
self.logger.warn("unable to listen on udp %r", addr)
raise
- self.add_channel(_map)
- notify_select()
+ self.add_channel(map)
+ self.implementation.notify_select()
def handle_read(self):
message, addr = self.recvfrom(self.__buffer_size)
self.__handler(addr, message)
def close(self):
- self.del_channel(_map)
- call_from_thread(self.socket.close)
+ self.del_channel(self._map)
+ self.implementation.call_from_thread(self.socket.close)
# udp uses GIL to get thread-safe socket management
if is_win32:
_udp_socks = {socket.AF_INET: []}
else:
_udp_socks = {socket.AF_INET: [], socket.AF_UNIX: []}
-def udp(address, message):
- if isinstance(address, str):
- family = socket.AF_UNIX
- else:
- family = socket.AF_INET
- try:
- sock = _udp_socks[family].pop()
- except IndexError:
- sock = socket.socket(family, socket.SOCK_DGRAM)
- sock.sendto(message, address)
- _udp_socks[family].append(sock)
# The following trigger code is greatly simplified from the Medusa
# trigger code.
@@ -536,15 +588,17 @@
if os.name == 'posix':
class _Trigger(_Triggerbase, asyncore.file_dispatcher):
- def __init__(self):
+ def __init__(self, map):
_Triggerbase.__init__(self)
- self.__readfd, self.__writefd = os.pipe()
- asyncore.file_dispatcher.__init__(self, self.__readfd)
+ r, self.__writefd = os.pipe()
+ asyncore.file_dispatcher.__init__(self, r, map)
+ # file_dispatcher dups r, so we don't need it any more
+ os.close(r)
+
def close(self):
- self.del_channel(_map)
os.close(self.__writefd)
- os.close(self.__readfd)
+ asyncore.file_dispatcher.close(self)
def pull_trigger(self):
if __debug__:
@@ -553,16 +607,20 @@
def add_channel(self, map=None):
# work around file-dispatcher bug
- assert (map is None) or (map is _map)
- asyncore.dispatcher.add_channel(self, _map)
+ assert (map is None) or (map is self._map)
+ asyncore.dispatcher.add_channel(self, self._map)
else:
# Windows version; uses just sockets, because a pipe isn't select'able
# on Windows.
+ class BindError(Exception):
+ pass
+
class _Trigger(_Triggerbase, asyncore.dispatcher):
- def __init__(self):
+ def __init__(self, map):
_Triggerbase.__init__(self)
+ _Triggerbase.__init__(self)
# Get a pair of connected sockets. The trigger is the 'w'
# end of the pair, which is connected to 'r'. 'r' is put
@@ -614,10 +672,10 @@
r, addr = a.accept() # r becomes asyncore's (self.)socket
a.close()
self.trigger = w
- asyncore.dispatcher.__init__(self, r, _map)
+ asyncore.dispatcher.__init__(self, r, map)
def close(self):
- self.del_channel(_map)
+ self.del_channel(self._map)
# self.socket is r, and self.trigger is w, from __init__
self.socket.close()
self.trigger.close()
@@ -627,43 +685,13 @@
self.logger.debug('notify select %s', pid)
self.trigger.send('x')
-_trigger = _Trigger()
+_select_implementation = Implementation()
-notify_select = _trigger.pull_trigger
-
-def call_from_thread(func):
- _trigger.callbacks.append(func)
- notify_select()
-
-def loop():
- timeout = 30.0
- map = _map
- connectors = _connectors
- logger = logging.getLogger('zc.ngi.async.loop')
- list_ = list
-
- while map:
- for f in list_(connectors):
- c = connectors.pop(f)
- c.connect()
-
- try:
- asyncore.poll(timeout, map)
- except:
- print sys.exc_info()[0]
- logger.exception('loop error')
- raise
-
-_thread = None
-_start_lock = threading.Lock()
-def start_thread(daemon=True):
- global _thread
- _start_lock.acquire()
- try:
- if _thread is not None:
- return
- _thread = threading.Thread(target=loop, name=__name__)
- _thread.setDaemon(daemon)
- _thread.start()
- finally:
- _start_lock.release()
+call_from_thread = _select_implementation.call_from_thread
+connect = connector = _select_implementation.connect
+listener = _select_implementation.listener
+start_thread = _select_implementation.start_thread
+udp = _select_implementation.udp
+udp_listener = _select_implementation.udp_listener
+_map = _select_implementation._map
+cleanup_map = _select_implementation.cleanup_map
Modified: zc.ngi/trunk/src/zc/ngi/tests.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/tests.py 2010-06-21 10:37:59 UTC (rev 113723)
+++ zc.ngi/trunk/src/zc/ngi/tests.py 2010-06-21 11:08:59 UTC (rev 113724)
@@ -331,6 +331,9 @@
#import logging
#logging.getLogger().addHandler(logging.StreamHandler())
+ # clean up the map.
+ zc.ngi.async.cleanup_map()
+
# See if we can break the main loop before running the async test
# Connect to bad port with bad handler
More information about the checkins
mailing list