[Checkins] SVN: zc.ngi/branches/jim-thready/src/zc/ngi/ Made the async implementation into an instance so I can play with

Jim Fulton jim at zope.com
Fri Oct 2 18:19:25 EDT 2009


Log message for revision 104758:
  Made the async implementation into an instance so I can play with
  having more than one.
  

Changed:
  U   zc.ngi/branches/jim-thready/src/zc/ngi/async.py
  U   zc.ngi/branches/jim-thready/src/zc/ngi/interfaces.py
  U   zc.ngi/branches/jim-thready/src/zc/ngi/tests.py

-=-
Modified: zc.ngi/branches/jim-thready/src/zc/ngi/async.py
===================================================================
--- zc.ngi/branches/jim-thready/src/zc/ngi/async.py	2009-10-02 20:56:47 UTC (rev 104757)
+++ zc.ngi/branches/jim-thready/src/zc/ngi/async.py	2009-10-02 22:19:25 UTC (rev 104758)
@@ -20,19 +20,14 @@
 import errno
 import logging
 import os
-import select
 import socket
 import sys
 import threading
-import time
 
 import zc.ngi
 
 pid = os.getpid()
 
-_map = {}
-_connectors = {}
-
 expected_socket_read_errors = {
     errno.EWOULDBLOCK: 0,
     errno.EAGAIN: 0,
@@ -48,11 +43,73 @@
 
 BUFFER_SIZE = 8*1024
 
+class SelectImplementation:
+
+    def __init__(self):
+        self._map = {}
+        self._trigger = _Trigger(self._map)
+        self.notify_select = self._trigger.pull_trigger
+        self._start_lock = threading.Lock()
+
+    def call_from_thread(self, func):
+        self._trigger.callbacks.append(func)
+        self.notify_select()
+
+    def connect(self, addr, handler):
+        self.start_thread()
+        self.call_from_thread(lambda : _Connector(addr, handler, self))
+
+    def listener(self, addr, handler):
+        self.start_thread()
+        return _Listener(addr, handler, self)
+
+    def udp(self, address, message):
+        if isinstance(address, str):
+            family = socket.AF_UNIX
+        else:
+            family = socket.AF_INET
+        try:
+            sock = _udp_socks[family].pop()
+        except IndexError:
+            sock = socket.socket(family, socket.SOCK_DGRAM)
+
+        sock.sendto(message, address)
+        _udp_socks[family].append(sock)
+
+    def udp_listener(self, addr, handler, buffer_size=4096):
+        self.start_thread()
+        return _UDPListener(addr, handler, buffer_size, self)
+
+    _thread = None
+    def start_thread(self, daemon=True, name=__name__):
+        self._start_lock.acquire()
+        try:
+            if self._thread is None:
+                self._thread = threading.Thread(target=self._loop, name=name)
+                self._thread.setDaemon(True)
+                self._thread.start()
+        finally:
+            self._start_lock.release()
+
+    def _loop(self):
+        timeout = 30.0
+        map = self._map
+        logger = logging.getLogger('zc.ngi.async.loop')
+
+        while map:
+            try:
+                asyncore.poll(timeout, map)
+            except:
+                logger.exception('loop error')
+                raise
+
+
 class dispatcher(asyncore.dispatcher):
 
-    def __init__(self, sock, addr, map=_map):
+    def __init__(self, sock, addr, implementation):
         self.addr = addr
-        asyncore.dispatcher.__init__(self, sock, map)
+        self.implementation = implementation
+        asyncore.dispatcher.__init__(self, sock, implementation._map)
 
     def handle_error(self):
         reason = sys.exc_info()[1]
@@ -66,8 +123,8 @@
             self.close()
 
     def close(self):
-        self.del_channel(_map)
-        self.socket.close()
+        self.del_channel(self._map)
+        self.implementation.call_from_thread(self.socket.close)
 
     def writable(self):
         return False
@@ -76,13 +133,13 @@
 
     control = None
 
-    def __init__(self, sock, addr, logger):
+    def __init__(self, sock, addr, logger, implementation):
         self.__connected = True
         self.__closed = None
         self.__handler = None
         self.__exception = None
         self.__output = []
-        dispatcher.__init__(self, sock, addr)
+        dispatcher.__init__(self, sock, addr, implementation)
         self.logger = logger
 
     def __nonzero__(self):
@@ -237,7 +294,7 @@
         self.handle_close('socket error')
 
 
-class connector(dispatcher):
+class _Connector(dispatcher):
 
     logger = logging.getLogger('zc.ngi.async.client')
 
@@ -257,22 +314,20 @@
         _CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
         _CONNECT_OK          = (0, errno.EISCONN)
 
-    def __init__(self, addr, handler):
+    def __init__(self, addr, handler, implementation):
         self.__handler = handler
         if isinstance(addr, str):
             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
         else:
             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
-        dispatcher.__init__(self, sock, addr, _connectors)
+        dispatcher.__init__(self, sock, addr, implementation)
 
-        notify_select()
-
-    def connect(self):
         if __debug__:
             self.logger.debug('connecting to %s', self.addr)
 
-        self.add_channel(_map)
+        # INVARIANT: we are called from the select thread!
+
         try:
             self.handle_write_event()
         except:
@@ -304,11 +359,12 @@
             self.handle_close(reason)
             return
 
-        self.del_channel(_map)
+        self.del_channel(self._map)
         if __debug__:
             self.logger.debug('outgoing connected %r', self.addr)
 
-        connection = _Connection(self.socket, self.addr, self.logger)
+        connection = _Connection(self.socket, self.addr, self.logger,
+                                 self.implementation)
         try:
             self.__handler.connected(connection)
         except:
@@ -342,23 +398,25 @@
         # work around file-dispatcher bug
         if map is None:
             return
-        assert (map is _map)
-        asyncore.dispatcher.add_channel(self, _map)
+        assert (map is self._map)
+        asyncore.dispatcher.add_channel(self, self._map)
 
     def handle_error(self):
         reason = sys.exc_info()[1]
         self.logger.exception('listener error')
         self.close()
 
-class listener(BaseListener):
+class _Listener(BaseListener):
 
     logger = logging.getLogger('zc.ngi.async.server')
 
-    def __init__(self, addr, handler):
+    def __init__(self, addr, handler, implementation):
+        self.implementation = implementation
+        map = implementation._map
         self.__handler = handler
         self.__close_handler = None
         self.__connections = {}
-        asyncore.dispatcher.__init__(self)
+        asyncore.dispatcher.__init__(self, map=map)
         if isinstance(addr, str):
             family = socket.AF_UNIX
         else:
@@ -372,7 +430,7 @@
         except socket.error:
             self.close()
             raise
-        self.add_channel(_map)
+        self.add_channel(map)
         notify_select()
 
     def handle_accept(self):
@@ -392,7 +450,7 @@
         if __debug__:
             self.logger.debug('incoming connection %r', addr)
 
-        connection = _Connection(sock, addr, self.logger)
+        connection = _Connection(sock, addr, self.logger, self.implementation)
         self.__connections[connection] = 1
         connection.control = self
         try:
@@ -412,8 +470,8 @@
 
     def close(self, handler=None):
         self.accepting = False
-        self.del_channel(_map)
-        self.socket.close()
+        self.del_channel(self._map)
+        self.implementation.call_from_thread(self.socket.close)
         if handler is None:
             for c in list(self.__connections):
                 c.handle_close("stopped")
@@ -422,15 +480,17 @@
         else:
             self.__close_handler = handler
 
-class udp_listener(BaseListener):
+class _UDPListener(BaseListener):
 
     logger = logging.getLogger('zc.ngi.async.udpserver')
     connected = True
 
-    def __init__(self, addr, handler, buffer_size=4096):
+    def __init__(self, addr, handler, buffer_size, implementation):
+        self.implementation = implementation
+        map = implementation._map
         self.__handler = handler
         self.__buffer_size = buffer_size
-        asyncore.dispatcher.__init__(self)
+        asyncore.dispatcher.__init__(self, map=map)
         if isinstance(addr, str):
             family = socket.AF_UNIX
         else:
@@ -443,7 +503,7 @@
         except socket.error:
             self.close()
             raise
-        self.add_channel(_map)
+        self.add_channel(map)
         notify_select()
 
     def handle_read(self):
@@ -451,22 +511,11 @@
         self.__handler(addr, message)
 
     def close(self):
-        self.del_channel(_map)
-        self.socket.close()
+        self.del_channel(self._map)
+        self.implementation.call_from_thread(self.socket.close)
 
 # udp uses GIL to get thread-safe socket management
 _udp_socks = {socket.AF_INET: [], socket.AF_UNIX: []}
-def udp(address, message):
-    if isinstance(address, str):
-        family = socket.AF_UNIX
-    else:
-        family = socket.AF_INET
-    try:
-        sock = _udp_socks[family].pop()
-    except IndexError:
-        sock = socket.socket(family, socket.SOCK_DGRAM)
-    sock.sendto(message, address)
-    _udp_socks[family].append(sock)
 
 # The following trigger code is greatly simplified from the Medusa
 # trigger code.
@@ -476,6 +525,9 @@
 
     logger = logging.getLogger('zc.ngi.async.trigger')
 
+    def __init__(self):
+        self.callbacks = []
+
     def writable(self):
         return 0
 
@@ -487,6 +539,13 @@
         self.close()
 
     def handle_read(self):
+        while self.callbacks:
+            callback = self.callbacks.pop(0)
+            try:
+                callback()
+            except:
+                self.logger.exception('Calling callback')
+
         try:
             self.recv(BUFFER_SIZE)
         except socket.error:
@@ -495,12 +554,13 @@
 if os.name == 'posix':
 
     class _Trigger(_Triggerbase, asyncore.file_dispatcher):
-        def __init__(self):
+        def __init__(self, map):
+            _Triggerbase.__init__(self)
             self.__readfd, self.__writefd = os.pipe()
-            asyncore.file_dispatcher.__init__(self, self.__readfd)
+            asyncore.file_dispatcher.__init__(self, self.__readfd, map)
 
         def close(self):
-            self.del_channel(_map)
+            self.del_channel(self._map)
             os.close(self.__writefd)
             os.close(self.__readfd)
 
@@ -511,15 +571,19 @@
 
         def add_channel(self, map=None):
             # work around file-dispatcher bug
-            assert (map is None) or (map is _map)
-            asyncore.dispatcher.add_channel(self, _map)
+            assert (map is None) or (map is self._map)
+            asyncore.dispatcher.add_channel(self, self._map)
 
 else:
     # Windows version; uses just sockets, because a pipe isn't select'able
     # on Windows.
 
+    class BindError(Exception):
+        pass
+
     class _Trigger(_Triggerbase, asyncore.dispatcher):
-        def __init__(self):
+        def __init__(self, map):
+            _Triggerbase.__init__(self)
 
             # Get a pair of connected sockets.  The trigger is the 'w'
             # end of the pair, which is connected to 'r'.  'r' is put
@@ -571,10 +635,10 @@
             r, addr = a.accept()  # r becomes asyncore's (self.)socket
             a.close()
             self.trigger = w
-            asyncore.dispatcher.__init__(self, r, _map)
+            asyncore.dispatcher.__init__(self, r, map)
 
         def close(self):
-            self.del_channel(_map)
+            self.del_channel(self._map)
             # self.socket is r, and self.trigger is w, from __init__
             self.socket.close()
             self.trigger.close()
@@ -584,27 +648,13 @@
                 self.logger.debug('notify select %s', pid)
             self.trigger.send('x')
 
-_trigger = _Trigger()
+_select_implementation = SelectImplementation()
 
-notify_select = _trigger.pull_trigger
-
-def loop():
-    timeout = 30.0
-    map = _map
-    connectors = _connectors
-    logger = logging.getLogger('zc.ngi.async.loop')
-
-    while map:
-        for f in list(connectors):
-            c = connectors.pop(f)
-            c.connect()
-
-        try:
-            asyncore.poll(timeout, map)
-        except:
-            logger.exception('loop error')
-            raise
-
-_thread = threading.Thread(target=loop, name=__name__)
-_thread.setDaemon(True)
-_thread.start()
+call_from_thread = _select_implementation.call_from_thread
+connect = connector = _select_implementation.connect
+listener = _select_implementation.listener
+notify_select = _select_implementation.notify_select
+start_thread = _select_implementation.start_thread
+udp = _select_implementation.udp
+udp_listener = _select_implementation.udp_listener
+_map = _select_implementation._map

Modified: zc.ngi/branches/jim-thready/src/zc/ngi/interfaces.py
===================================================================
--- zc.ngi/branches/jim-thready/src/zc/ngi/interfaces.py	2009-10-02 20:56:47 UTC (rev 104757)
+++ zc.ngi/branches/jim-thready/src/zc/ngi/interfaces.py	2009-10-02 22:19:25 UTC (rev 104758)
@@ -83,7 +83,7 @@
         """Send a UDP message
         """
 
-    def udp_listen(address, handler, buffer_size=4096):
+    def udp_listener(address, handler, buffer_size=4096):
         """Listen for incoming UDP messages
 
         When a message is received, call the handler with the message.

Modified: zc.ngi/branches/jim-thready/src/zc/ngi/tests.py
===================================================================
--- zc.ngi/branches/jim-thready/src/zc/ngi/tests.py	2009-10-02 20:56:47 UTC (rev 104757)
+++ zc.ngi/branches/jim-thready/src/zc/ngi/tests.py	2009-10-02 22:19:25 UTC (rev 104758)
@@ -21,6 +21,8 @@
 import zc.ngi.async
 import zc.ngi.wordcount
 
+zc.ngi.async.start_thread()
+
 def test_async_cannot_connect():
     """Let's make sure that the connector handles connection failures correctly
 



More information about the checkins mailing list