[Checkins] SVN: zc.ngi/branches/jim-thready/src/zc/ngi/ Made the async implementation into an instance so I can play with
Jim Fulton
jim at zope.com
Fri Oct 2 18:19:25 EDT 2009
Log message for revision 104758:
Made the async implementation into an instance so I can play with
having more than one.
Changed:
U zc.ngi/branches/jim-thready/src/zc/ngi/async.py
U zc.ngi/branches/jim-thready/src/zc/ngi/interfaces.py
U zc.ngi/branches/jim-thready/src/zc/ngi/tests.py
-=-
Modified: zc.ngi/branches/jim-thready/src/zc/ngi/async.py
===================================================================
--- zc.ngi/branches/jim-thready/src/zc/ngi/async.py 2009-10-02 20:56:47 UTC (rev 104757)
+++ zc.ngi/branches/jim-thready/src/zc/ngi/async.py 2009-10-02 22:19:25 UTC (rev 104758)
@@ -20,19 +20,14 @@
import errno
import logging
import os
-import select
import socket
import sys
import threading
-import time
import zc.ngi
pid = os.getpid()
-_map = {}
-_connectors = {}
-
expected_socket_read_errors = {
errno.EWOULDBLOCK: 0,
errno.EAGAIN: 0,
@@ -48,11 +43,73 @@
BUFFER_SIZE = 8*1024
+class SelectImplementation:
+
+ def __init__(self):
+ self._map = {}
+ self._trigger = _Trigger(self._map)
+ self.notify_select = self._trigger.pull_trigger
+ self._start_lock = threading.Lock()
+
+ def call_from_thread(self, func):
+ self._trigger.callbacks.append(func)
+ self.notify_select()
+
+ def connect(self, addr, handler):
+ self.start_thread()
+ self.call_from_thread(lambda : _Connector(addr, handler, self))
+
+ def listener(self, addr, handler):
+ self.start_thread()
+ return _Listener(addr, handler, self)
+
+ def udp(self, address, message):
+ if isinstance(address, str):
+ family = socket.AF_UNIX
+ else:
+ family = socket.AF_INET
+ try:
+ sock = _udp_socks[family].pop()
+ except IndexError:
+ sock = socket.socket(family, socket.SOCK_DGRAM)
+
+ sock.sendto(message, address)
+ _udp_socks[family].append(sock)
+
+ def udp_listener(self, addr, handler, buffer_size=4096):
+ self.start_thread()
+ return _UDPListener(addr, handler, buffer_size, self)
+
+ _thread = None
+ def start_thread(self, daemon=True, name=__name__):
+ self._start_lock.acquire()
+ try:
+ if self._thread is None:
+ self._thread = threading.Thread(target=self._loop, name=name)
+ self._thread.setDaemon(True)
+ self._thread.start()
+ finally:
+ self._start_lock.release()
+
+ def _loop(self):
+ timeout = 30.0
+ map = self._map
+ logger = logging.getLogger('zc.ngi.async.loop')
+
+ while map:
+ try:
+ asyncore.poll(timeout, map)
+ except:
+ logger.exception('loop error')
+ raise
+
+
class dispatcher(asyncore.dispatcher):
- def __init__(self, sock, addr, map=_map):
+ def __init__(self, sock, addr, implementation):
self.addr = addr
- asyncore.dispatcher.__init__(self, sock, map)
+ self.implementation = implementation
+ asyncore.dispatcher.__init__(self, sock, implementation._map)
def handle_error(self):
reason = sys.exc_info()[1]
@@ -66,8 +123,8 @@
self.close()
def close(self):
- self.del_channel(_map)
- self.socket.close()
+ self.del_channel(self._map)
+ self.implementation.call_from_thread(self.socket.close)
def writable(self):
return False
@@ -76,13 +133,13 @@
control = None
- def __init__(self, sock, addr, logger):
+ def __init__(self, sock, addr, logger, implementation):
self.__connected = True
self.__closed = None
self.__handler = None
self.__exception = None
self.__output = []
- dispatcher.__init__(self, sock, addr)
+ dispatcher.__init__(self, sock, addr, implementation)
self.logger = logger
def __nonzero__(self):
@@ -237,7 +294,7 @@
self.handle_close('socket error')
-class connector(dispatcher):
+class _Connector(dispatcher):
logger = logging.getLogger('zc.ngi.async.client')
@@ -257,22 +314,20 @@
_CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
_CONNECT_OK = (0, errno.EISCONN)
- def __init__(self, addr, handler):
+ def __init__(self, addr, handler, implementation):
self.__handler = handler
if isinstance(addr, str):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- dispatcher.__init__(self, sock, addr, _connectors)
+ dispatcher.__init__(self, sock, addr, implementation)
- notify_select()
-
- def connect(self):
if __debug__:
self.logger.debug('connecting to %s', self.addr)
- self.add_channel(_map)
+ # INVARIANT: we are called from the select thread!
+
try:
self.handle_write_event()
except:
@@ -304,11 +359,12 @@
self.handle_close(reason)
return
- self.del_channel(_map)
+ self.del_channel(self._map)
if __debug__:
self.logger.debug('outgoing connected %r', self.addr)
- connection = _Connection(self.socket, self.addr, self.logger)
+ connection = _Connection(self.socket, self.addr, self.logger,
+ self.implementation)
try:
self.__handler.connected(connection)
except:
@@ -342,23 +398,25 @@
# work around file-dispatcher bug
if map is None:
return
- assert (map is _map)
- asyncore.dispatcher.add_channel(self, _map)
+ assert (map is self._map)
+ asyncore.dispatcher.add_channel(self, self._map)
def handle_error(self):
reason = sys.exc_info()[1]
self.logger.exception('listener error')
self.close()
-class listener(BaseListener):
+class _Listener(BaseListener):
logger = logging.getLogger('zc.ngi.async.server')
- def __init__(self, addr, handler):
+ def __init__(self, addr, handler, implementation):
+ self.implementation = implementation
+ map = implementation._map
self.__handler = handler
self.__close_handler = None
self.__connections = {}
- asyncore.dispatcher.__init__(self)
+ asyncore.dispatcher.__init__(self, map=map)
if isinstance(addr, str):
family = socket.AF_UNIX
else:
@@ -372,7 +430,7 @@
except socket.error:
self.close()
raise
- self.add_channel(_map)
+ self.add_channel(map)
notify_select()
def handle_accept(self):
@@ -392,7 +450,7 @@
if __debug__:
self.logger.debug('incoming connection %r', addr)
- connection = _Connection(sock, addr, self.logger)
+ connection = _Connection(sock, addr, self.logger, self.implementation)
self.__connections[connection] = 1
connection.control = self
try:
@@ -412,8 +470,8 @@
def close(self, handler=None):
self.accepting = False
- self.del_channel(_map)
- self.socket.close()
+ self.del_channel(self._map)
+ self.implementation.call_from_thread(self.socket.close)
if handler is None:
for c in list(self.__connections):
c.handle_close("stopped")
@@ -422,15 +480,17 @@
else:
self.__close_handler = handler
-class udp_listener(BaseListener):
+class _UDPListener(BaseListener):
logger = logging.getLogger('zc.ngi.async.udpserver')
connected = True
- def __init__(self, addr, handler, buffer_size=4096):
+ def __init__(self, addr, handler, buffer_size, implementation):
+ self.implementation = implementation
+ map = implementation._map
self.__handler = handler
self.__buffer_size = buffer_size
- asyncore.dispatcher.__init__(self)
+ asyncore.dispatcher.__init__(self, map=map)
if isinstance(addr, str):
family = socket.AF_UNIX
else:
@@ -443,7 +503,7 @@
except socket.error:
self.close()
raise
- self.add_channel(_map)
+ self.add_channel(map)
notify_select()
def handle_read(self):
@@ -451,22 +511,11 @@
self.__handler(addr, message)
def close(self):
- self.del_channel(_map)
- self.socket.close()
+ self.del_channel(self._map)
+ self.implementation.call_from_thread(self.socket.close)
# udp uses GIL to get thread-safe socket management
_udp_socks = {socket.AF_INET: [], socket.AF_UNIX: []}
-def udp(address, message):
- if isinstance(address, str):
- family = socket.AF_UNIX
- else:
- family = socket.AF_INET
- try:
- sock = _udp_socks[family].pop()
- except IndexError:
- sock = socket.socket(family, socket.SOCK_DGRAM)
- sock.sendto(message, address)
- _udp_socks[family].append(sock)
# The following trigger code is greatly simplified from the Medusa
# trigger code.
@@ -476,6 +525,9 @@
logger = logging.getLogger('zc.ngi.async.trigger')
+ def __init__(self):
+ self.callbacks = []
+
def writable(self):
return 0
@@ -487,6 +539,13 @@
self.close()
def handle_read(self):
+ while self.callbacks:
+ callback = self.callbacks.pop(0)
+ try:
+ callback()
+ except:
+ self.logger.exception('Calling callback')
+
try:
self.recv(BUFFER_SIZE)
except socket.error:
@@ -495,12 +554,13 @@
if os.name == 'posix':
class _Trigger(_Triggerbase, asyncore.file_dispatcher):
- def __init__(self):
+ def __init__(self, map):
+ _Triggerbase.__init__(self)
self.__readfd, self.__writefd = os.pipe()
- asyncore.file_dispatcher.__init__(self, self.__readfd)
+ asyncore.file_dispatcher.__init__(self, self.__readfd, map)
def close(self):
- self.del_channel(_map)
+ self.del_channel(self._map)
os.close(self.__writefd)
os.close(self.__readfd)
@@ -511,15 +571,19 @@
def add_channel(self, map=None):
# work around file-dispatcher bug
- assert (map is None) or (map is _map)
- asyncore.dispatcher.add_channel(self, _map)
+ assert (map is None) or (map is self._map)
+ asyncore.dispatcher.add_channel(self, self._map)
else:
# Windows version; uses just sockets, because a pipe isn't select'able
# on Windows.
+ class BindError(Exception):
+ pass
+
class _Trigger(_Triggerbase, asyncore.dispatcher):
- def __init__(self):
+ def __init__(self, map):
+ _Triggerbase.__init__(self)
# Get a pair of connected sockets. The trigger is the 'w'
# end of the pair, which is connected to 'r'. 'r' is put
@@ -571,10 +635,10 @@
r, addr = a.accept() # r becomes asyncore's (self.)socket
a.close()
self.trigger = w
- asyncore.dispatcher.__init__(self, r, _map)
+ asyncore.dispatcher.__init__(self, r, map)
def close(self):
- self.del_channel(_map)
+ self.del_channel(self._map)
# self.socket is r, and self.trigger is w, from __init__
self.socket.close()
self.trigger.close()
@@ -584,27 +648,13 @@
self.logger.debug('notify select %s', pid)
self.trigger.send('x')
-_trigger = _Trigger()
+_select_implementation = SelectImplementation()
-notify_select = _trigger.pull_trigger
-
-def loop():
- timeout = 30.0
- map = _map
- connectors = _connectors
- logger = logging.getLogger('zc.ngi.async.loop')
-
- while map:
- for f in list(connectors):
- c = connectors.pop(f)
- c.connect()
-
- try:
- asyncore.poll(timeout, map)
- except:
- logger.exception('loop error')
- raise
-
-_thread = threading.Thread(target=loop, name=__name__)
-_thread.setDaemon(True)
-_thread.start()
+call_from_thread = _select_implementation.call_from_thread
+connect = connector = _select_implementation.connect
+listener = _select_implementation.listener
+notify_select = _select_implementation.notify_select
+start_thread = _select_implementation.start_thread
+udp = _select_implementation.udp
+udp_listener = _select_implementation.udp_listener
+_map = _select_implementation._map
Modified: zc.ngi/branches/jim-thready/src/zc/ngi/interfaces.py
===================================================================
--- zc.ngi/branches/jim-thready/src/zc/ngi/interfaces.py 2009-10-02 20:56:47 UTC (rev 104757)
+++ zc.ngi/branches/jim-thready/src/zc/ngi/interfaces.py 2009-10-02 22:19:25 UTC (rev 104758)
@@ -83,7 +83,7 @@
"""Send a UDP message
"""
- def udp_listen(address, handler, buffer_size=4096):
+ def udp_listener(address, handler, buffer_size=4096):
"""Listen for incoming UDP messages
When a message is received, call the handler with the message.
Modified: zc.ngi/branches/jim-thready/src/zc/ngi/tests.py
===================================================================
--- zc.ngi/branches/jim-thready/src/zc/ngi/tests.py 2009-10-02 20:56:47 UTC (rev 104757)
+++ zc.ngi/branches/jim-thready/src/zc/ngi/tests.py 2009-10-02 22:19:25 UTC (rev 104758)
@@ -21,6 +21,8 @@
import zc.ngi.async
import zc.ngi.wordcount
+zc.ngi.async.start_thread()
+
def test_async_cannot_connect():
"""Let's make sure that the connector handles connection failures correctly
More information about the checkins
mailing list