[Checkins] SVN: zc.ngi/trunk/ Added support for running multiple ``async`` implementations in

Jim Fulton jim at zope.com
Mon Jun 21 07:08:59 EDT 2010


Log message for revision 113724:
  Added support for running multiple ``async`` implementations in
  separate threads. This is useful in applications with fewer network
  connections and with handlers that tend to perform long-lating
  computations that would be unacceptable with a single select loop.
  

Changed:
  U   zc.ngi/trunk/README.txt
  U   zc.ngi/trunk/src/zc/ngi/async.py
  U   zc.ngi/trunk/src/zc/ngi/tests.py

-=-
Modified: zc.ngi/trunk/README.txt
===================================================================
--- zc.ngi/trunk/README.txt	2010-06-21 10:37:59 UTC (rev 113723)
+++ zc.ngi/trunk/README.txt	2010-06-21 11:08:59 UTC (rev 113724)
@@ -36,6 +36,11 @@
   ``zc.ngi.blocking.request``.  Other older blocking APIs are
   deprecated.
 
+- Added support for running multiple ``async`` implementations in
+  separate threads. This is useful in applications with fewer network
+  connections and with handlers that tend to perform long-lating
+  computations that would be unacceptable with a single select loop.
+
 - Dropped support for Python 2.4.
 
 Bugs Fixed:

Modified: zc.ngi/trunk/src/zc/ngi/async.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/async.py	2010-06-21 10:37:59 UTC (rev 113723)
+++ zc.ngi/trunk/src/zc/ngi/async.py	2010-06-21 11:08:59 UTC (rev 113724)
@@ -12,8 +12,6 @@
 #
 ##############################################################################
 """Asyncore-based implementation of the NGI
-
-$Id$
 """
 
 import asyncore
@@ -29,9 +27,6 @@
 pid = os.getpid()
 is_win32 = sys.platform == 'win32'
 
-_map = {}
-_connectors = {}
-
 expected_socket_read_errors = {
     errno.EWOULDBLOCK: 0,
     errno.EAGAIN: 0,
@@ -47,11 +42,83 @@
 
 BUFFER_SIZE = 8*1024
 
+class Implementation:
+
+    def __init__(self):
+        self._map = {}
+        self._trigger = _Trigger(self._map)
+        self.notify_select = self._trigger.pull_trigger
+        self._start_lock = threading.Lock()
+
+    def call_from_thread(self, func):
+        self._trigger.callbacks.append(func)
+        self.notify_select()
+
+    def connect(self, addr, handler):
+        self.start_thread()
+        self.call_from_thread(lambda : _Connector(addr, handler, self))
+
+    def listener(self, addr, handler):
+        self.start_thread()
+        return _Listener(addr, handler, self)
+
+    def udp(self, address, message):
+        if isinstance(address, str):
+            family = socket.AF_UNIX
+        else:
+            family = socket.AF_INET
+        try:
+            sock = _udp_socks[family].pop()
+        except IndexError:
+            sock = socket.socket(family, socket.SOCK_DGRAM)
+
+        sock.sendto(message, address)
+        _udp_socks[family].append(sock)
+
+    def udp_listener(self, addr, handler, buffer_size=4096):
+        self.start_thread()
+        return _UDPListener(addr, handler, buffer_size, self)
+
+    _thread = None
+    def start_thread(self, daemon=True, name=__name__):
+        self._start_lock.acquire()
+        try:
+            if self._thread is None:
+                self._thread = threading.Thread(target=self._loop, name=name)
+                self._thread.setDaemon(True)
+                self._thread.start()
+        finally:
+            self._start_lock.release()
+
+    def _loop(self):
+        timeout = 30.0
+        map = self._map
+        logger = logging.getLogger('zc.ngi.async.loop')
+
+        while map:
+            try:
+                asyncore.poll(timeout, map)
+            except:
+                traceback.print_exception(*sys.exc_info())
+                #logger.exception('loop error')
+                raise
+
+    def cleanup_map(self):
+        for c in self._map.values():
+            if isinstance(c, _Trigger):
+                continue
+            try:
+                del self._map[c.fileno()]
+            except KeyError:
+                pass
+            c.close()
+
 class dispatcher(asyncore.dispatcher):
 
-    def __init__(self, sock, addr, map=_map):
+    def __init__(self, sock, addr, implementation):
         self.addr = addr
-        asyncore.dispatcher.__init__(self, sock, map)
+        self.implementation = implementation
+        asyncore.dispatcher.__init__(self, sock, implementation._map)
 
     def handle_error(self):
         reason = sys.exc_info()[1]
@@ -65,8 +132,8 @@
             self.close()
 
     def close(self):
-        self.del_channel(_map)
-        self.socket.close()
+        self.del_channel(self._map)
+        self.implementation.call_from_thread(self.socket.close)
 
     def writable(self):
         return False
@@ -75,13 +142,13 @@
 
     control = None
 
-    def __init__(self, sock, addr, logger):
+    def __init__(self, sock, addr, logger, implementation):
         self.__connected = True
         self.__closed = None
         self.__handler = None
         self.__iterator_exception = None
         self.__output = []
-        dispatcher.__init__(self, sock, addr)
+        dispatcher.__init__(self, sock, addr, implementation)
         self.logger = logger
 
     def __nonzero__(self):
@@ -114,14 +181,14 @@
             self.logger.debug('write %r', data)
         assert isinstance(data, str) or (data is zc.ngi.END_OF_DATA)
         self.__output.append(data)
-        notify_select()
+        self.implementation.notify_select()
 
     def writelines(self, data):
         if __debug__:
             self.logger.debug('writelines %r', data)
         assert not isinstance(data, str), "writelines does not accept strings"
         self.__output.append(iter(data))
-        notify_select()
+        self.implementation.notify_select()
 
     def close(self):
         self.__connected = False
@@ -129,7 +196,7 @@
         dispatcher.close(self)
         if self.control is not None:
             self.control.closed(self)
-        notify_select()
+        self.implementation.notify_select()
 
     def readable(self):
         return self.__handler is not None
@@ -231,9 +298,8 @@
     def handle_expt(self):
         self.handle_close('socket error')
 
+class _Connector(dispatcher):
 
-class connector(dispatcher):
-
     logger = logging.getLogger('zc.ngi.async.client')
 
     # When trying to do a connect on a non-blocking socket, some outcomes
@@ -252,24 +318,20 @@
         _CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
         _CONNECT_OK          = (0, errno.EISCONN)
 
-    def __init__(self, addr, handler):
-        if not _thread:
-            start_thread()
+    def __init__(self, addr, handler, implementation):
         self.__handler = handler
         if isinstance(addr, str):
             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
         else:
             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
-        dispatcher.__init__(self, sock, addr, _connectors)
+        dispatcher.__init__(self, sock, addr, implementation)
 
-        notify_select()
-
-    def connect(self):
         if __debug__:
             self.logger.debug('connecting to %s', self.addr)
 
-        self.add_channel(_map)
+        # INVARIANT: we are called from the select thread!
+
         try:
             self.handle_write_event()
         except:
@@ -301,11 +363,12 @@
             self.handle_close(reason)
             return
 
-        self.del_channel(_map)
+        self.del_channel(self._map)
         if __debug__:
             self.logger.debug('outgoing connected %r', self.addr)
 
-        connection = _Connection(self.socket, self.addr, self.logger)
+        connection = _Connection(self.socket, self.addr, self.logger,
+                                 self.implementation)
         try:
             self.__handler.connected(connection)
         except:
@@ -327,9 +390,6 @@
     def handle_expt(self):
         self.handle_close('connection failed')
 
-def connect(*args):
-    connector(*args)
-
 class BaseListener(asyncore.dispatcher):
 
     def writable(self):
@@ -339,25 +399,26 @@
         # work around file-dispatcher bug
         if map is None:
             return
-        assert (map is _map)
-        asyncore.dispatcher.add_channel(self, _map)
+        assert (map is self._map)
+        asyncore.dispatcher.add_channel(self, self._map)
 
     def handle_error(self):
         reason = sys.exc_info()[1]
-        self.logger.exception('listener error')
+        #self.logger.exception('listener error')
+        traceback.print_exception(*sys.exc_info())
         self.close()
 
-class listener(BaseListener):
+class _Listener(BaseListener):
 
     logger = logging.getLogger('zc.ngi.async.server')
 
-    def __init__(self, addr, handler):
-        if not _thread:
-            start_thread()
+    def __init__(self, addr, handler, implementation):
+        self.implementation = implementation
+        map = implementation._map
         self.__handler = handler
         self.__close_handler = None
         self.__connections = {}
-        asyncore.dispatcher.__init__(self)
+        asyncore.dispatcher.__init__(self, map=map)
         if isinstance(addr, str):
             family = socket.AF_UNIX
         else:
@@ -391,9 +452,10 @@
             self.close()
             self.logger.warn("unable to listen on %r", addr)
             raise
-        self.add_channel(_map)
+
+        self.add_channel(map)
         self.address = addr
-        notify_select()
+        self.implementation.notify_select()
 
     def handle_accept(self):
         if not self.accepting:
@@ -407,12 +469,13 @@
                 # didn't get anything. Hm. Ignore.
                 return
         except socket.error, msg:
-            self.logger.exception("accepted failed: %s", msg)
+            traceback.print_exception(*sys.exc_info())
+            #self.logger.exception("accepted failed: %s", msg)
             return
         if __debug__:
             self.logger.debug('incoming connection %r', addr)
 
-        connection = _Connection(sock, addr, self.logger)
+        connection = _Connection(sock, addr, self.logger, self.implementation)
         self.__connections[connection] = 1
         connection.control = self
         try:
@@ -432,8 +495,8 @@
 
     def close(self, handler=None):
         self.accepting = False
-        self.del_channel(_map)
-        call_from_thread(self.socket.close)
+        self.del_channel(self._map)
+        self.implementation.call_from_thread(self.socket.close)
         if handler is None:
             for c in list(self.__connections):
                 c.handle_close("stopped")
@@ -442,21 +505,21 @@
         else:
             self.__close_handler = handler
 
-    # convenience method made possible by storaing out address:
+    # convenience method made possible by storing our address:
     def connect(self, handler):
-        connect(self.address, handler)
+        self.implementation.connect(self.address, handler)
 
-class udp_listener(BaseListener):
+class _UDPListener(BaseListener):
 
     logger = logging.getLogger('zc.ngi.async.udpserver')
     connected = True
 
-    def __init__(self, addr, handler, buffer_size=4096):
-        if not _thread:
-            start_thread()
+    def __init__(self, addr, handler, buffer_size, implementation):
+        self.implementation = implementation
+        map = implementation._map
         self.__handler = handler
         self.__buffer_size = buffer_size
-        asyncore.dispatcher.__init__(self)
+        asyncore.dispatcher.__init__(self, map=map)
         if isinstance(addr, str):
             family = socket.AF_UNIX
         else:
@@ -471,33 +534,22 @@
             self.close()
             self.logger.warn("unable to listen on udp %r", addr)
             raise
-        self.add_channel(_map)
-        notify_select()
+        self.add_channel(map)
+        self.implementation.notify_select()
 
     def handle_read(self):
         message, addr = self.recvfrom(self.__buffer_size)
         self.__handler(addr, message)
 
     def close(self):
-        self.del_channel(_map)
-        call_from_thread(self.socket.close)
+        self.del_channel(self._map)
+        self.implementation.call_from_thread(self.socket.close)
 
 # udp uses GIL to get thread-safe socket management
 if is_win32:
     _udp_socks = {socket.AF_INET: []}
 else:
     _udp_socks = {socket.AF_INET: [], socket.AF_UNIX: []}
-def udp(address, message):
-    if isinstance(address, str):
-        family = socket.AF_UNIX
-    else:
-        family = socket.AF_INET
-    try:
-        sock = _udp_socks[family].pop()
-    except IndexError:
-        sock = socket.socket(family, socket.SOCK_DGRAM)
-    sock.sendto(message, address)
-    _udp_socks[family].append(sock)
 
 # The following trigger code is greatly simplified from the Medusa
 # trigger code.
@@ -536,15 +588,17 @@
 if os.name == 'posix':
 
     class _Trigger(_Triggerbase, asyncore.file_dispatcher):
-        def __init__(self):
+        def __init__(self, map):
             _Triggerbase.__init__(self)
-            self.__readfd, self.__writefd = os.pipe()
-            asyncore.file_dispatcher.__init__(self, self.__readfd)
+            r, self.__writefd = os.pipe()
+            asyncore.file_dispatcher.__init__(self, r, map)
 
+            # file_dispatcher dups r, so we don't need it any more
+            os.close(r)
+
         def close(self):
-            self.del_channel(_map)
             os.close(self.__writefd)
-            os.close(self.__readfd)
+            asyncore.file_dispatcher.close(self)
 
         def pull_trigger(self):
             if __debug__:
@@ -553,16 +607,20 @@
 
         def add_channel(self, map=None):
             # work around file-dispatcher bug
-            assert (map is None) or (map is _map)
-            asyncore.dispatcher.add_channel(self, _map)
+            assert (map is None) or (map is self._map)
+            asyncore.dispatcher.add_channel(self, self._map)
 
 else:
     # Windows version; uses just sockets, because a pipe isn't select'able
     # on Windows.
 
+    class BindError(Exception):
+        pass
+
     class _Trigger(_Triggerbase, asyncore.dispatcher):
-        def __init__(self):
+        def __init__(self, map):
             _Triggerbase.__init__(self)
+            _Triggerbase.__init__(self)
 
             # Get a pair of connected sockets.  The trigger is the 'w'
             # end of the pair, which is connected to 'r'.  'r' is put
@@ -614,10 +672,10 @@
             r, addr = a.accept()  # r becomes asyncore's (self.)socket
             a.close()
             self.trigger = w
-            asyncore.dispatcher.__init__(self, r, _map)
+            asyncore.dispatcher.__init__(self, r, map)
 
         def close(self):
-            self.del_channel(_map)
+            self.del_channel(self._map)
             # self.socket is r, and self.trigger is w, from __init__
             self.socket.close()
             self.trigger.close()
@@ -627,43 +685,13 @@
                 self.logger.debug('notify select %s', pid)
             self.trigger.send('x')
 
-_trigger = _Trigger()
+_select_implementation = Implementation()
 
-notify_select = _trigger.pull_trigger
-
-def call_from_thread(func):
-    _trigger.callbacks.append(func)
-    notify_select()
-
-def loop():
-    timeout = 30.0
-    map = _map
-    connectors = _connectors
-    logger = logging.getLogger('zc.ngi.async.loop')
-    list_ = list
-
-    while map:
-        for f in list_(connectors):
-            c = connectors.pop(f)
-            c.connect()
-
-        try:
-            asyncore.poll(timeout, map)
-        except:
-            print sys.exc_info()[0]
-            logger.exception('loop error')
-            raise
-
-_thread = None
-_start_lock = threading.Lock()
-def start_thread(daemon=True):
-    global _thread
-    _start_lock.acquire()
-    try:
-        if _thread is not None:
-            return
-        _thread = threading.Thread(target=loop, name=__name__)
-        _thread.setDaemon(daemon)
-        _thread.start()
-    finally:
-        _start_lock.release()
+call_from_thread = _select_implementation.call_from_thread
+connect = connector = _select_implementation.connect
+listener = _select_implementation.listener
+start_thread = _select_implementation.start_thread
+udp = _select_implementation.udp
+udp_listener = _select_implementation.udp_listener
+_map = _select_implementation._map
+cleanup_map = _select_implementation.cleanup_map

Modified: zc.ngi/trunk/src/zc/ngi/tests.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/tests.py	2010-06-21 10:37:59 UTC (rev 113723)
+++ zc.ngi/trunk/src/zc/ngi/tests.py	2010-06-21 11:08:59 UTC (rev 113724)
@@ -331,6 +331,9 @@
     #import logging
     #logging.getLogger().addHandler(logging.StreamHandler())
 
+    # clean up the map.
+    zc.ngi.async.cleanup_map()
+
     # See if we can break the main loop before running the async test
 
     # Connect to bad port with bad handler



More information about the checkins mailing list