[Checkins] SVN: zc.ngi/trunk/ Added an experimental listener option to run each client (server

Jim Fulton jim at zope.com
Wed Jul 14 10:37:25 EDT 2010


Log message for revision 114745:
  Added an experimental listener option to run each client (server
  connection) in a separate thread.
  

Changed:
  U   zc.ngi/trunk/README.txt
  U   zc.ngi/trunk/src/zc/ngi/async.py
  U   zc.ngi/trunk/src/zc/ngi/tests.py

-=-
Modified: zc.ngi/trunk/README.txt
===================================================================
--- zc.ngi/trunk/README.txt	2010-07-14 14:24:25 UTC (rev 114744)
+++ zc.ngi/trunk/README.txt	2010-07-14 14:37:25 UTC (rev 114745)
@@ -20,6 +20,15 @@
 *******
 
 ====================
+2.0.0a2 (2010-07-??)
+====================
+
+New Features:
+
+- There's a new experimental zc.ngi.async.Implementation.listener
+  option to run each client (server connection) in it's own thread.
+
+====================
 2.0.0a1 (2010-07-08)
 ====================
 

Modified: zc.ngi/trunk/src/zc/ngi/async.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/async.py	2010-07-14 14:24:25 UTC (rev 114744)
+++ zc.ngi/trunk/src/zc/ngi/async.py	2010-07-14 14:37:25 UTC (rev 114745)
@@ -21,6 +21,7 @@
 import os
 import socket
 import sys
+import thread
 import threading
 import time
 import warnings
@@ -57,7 +58,11 @@
         self._callbacks = []
         self._start_lock = threading.Lock()
 
+    thread_ident = None
     def call_from_thread(self, func):
+        if thread.get_ident() == self.thread_ident:
+            func()
+            return
         self._callbacks.append(func)
         self.notify_select()
         self.start_thread()
@@ -69,8 +74,8 @@
         self.call_from_thread(lambda : _Connector(addr, handler, self))
         self.start_thread()
 
-    def listener(self, addr, handler):
-        result = _Listener(addr, handler, self)
+    def listener(self, addr, handler, thready=False):
+        result = _Listener(addr, handler, self, thready)
         self.start_thread()
         return result
 
@@ -93,7 +98,7 @@
         return result
 
     _thread = None
-    def start_thread(self, daemon=True):
+    def start_thread(self):
         with self._start_lock:
             if self._thread is None:
                 self._thread = threading.Thread(
@@ -111,6 +116,7 @@
             raise zc.ngi.interfaces.Timeout
 
     def loop(self, timeout=None):
+        self.thread_ident = thread.get_ident()
         if timeout is not None:
             deadline = time.time() + timeout
         else:
@@ -156,6 +162,7 @@
                 if timeout <= 0:
                     raise zc.ngi.interfaces.Timeout
         finally:
+            del self.thread_ident
             del self.notify_select
             trigger.close()
 
@@ -380,9 +387,9 @@
 class _ServerConnection(_Connection):
     zc.ngi.interfaces.implements(zc.ngi.interfaces.IServerConnection)
 
-    def __init__(self, sock, addr, logger, listener):
+    def __init__(self, sock, addr, logger, listener, implementation):
         self.control = listener
-        _Connection.__init__(self, sock, addr, logger, listener.implementation)
+        _Connection.__init__(self, sock, addr, logger, implementation)
 
     def close(self):
         _Connection.close(self)
@@ -503,8 +510,7 @@
 
     def handle_error(self):
         reason = sys.exc_info()[1]
-        #self.logger.exception('listener error')
-        traceback.print_exception(*sys.exc_info())
+        self.logger.exception('listener error')
         self.close()
         self.implementation.handle_error()
 
@@ -513,10 +519,11 @@
 
     logger = logging.getLogger('zc.ngi.async.server')
 
-    def __init__(self, addr, handler, implementation):
+    def __init__(self, addr, handler, implementation, thready):
         self.__handler = handler
         self.__close_handler = None
-        self.__connections = {}
+        self._thready = thready
+        self.__connections = set()
         BaseListener.__init__(self, implementation)
         if isinstance(addr, str):
             family = socket.AF_UNIX
@@ -574,20 +581,31 @@
         if __debug__:
             self.logger.debug('incoming connection %r', addr)
 
-        connection = _ServerConnection(sock, addr, self.logger, self)
-        self.__connections[connection] = 1
-        try:
-            self.__handler(connection)
-        except:
-            self.logger.exception("server handler failed")
-            self.close()
+        if self._thready:
+            impl = Implementation(name="%r client" % (self.address,))
+        else:
+            impl = self.implementation
+        connection = _ServerConnection(sock, addr, self.logger, self, impl)
+        self.__connections.add(connection)
 
+        @impl.call_from_thread
+        def _():
+            try:
+                self.__handler(connection)
+            except:
+                self.logger.exception("server handler failed")
+                self.close()
+
+        if impl is not self.implementation:
+            impl.start_thread()
+
+
     def connections(self):
         return iter(self.__connections)
 
     def closed(self, connection):
         if connection in self.__connections:
-            del self.__connections[connection]
+            self.__connections.remove(connection)
             if not self.__connections and self.__close_handler:
                 self.__close_handler(self)
 

Modified: zc.ngi/trunk/src/zc/ngi/tests.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/tests.py	2010-07-14 14:24:25 UTC (rev 114744)
+++ zc.ngi/trunk/src/zc/ngi/tests.py	2010-07-14 14:37:25 UTC (rev 114745)
@@ -479,6 +479,80 @@
 
     """
 
+def EXPERIMENTAL_thready_async_servers():
+    r"""
+    When creating a listener with a zc.ngi.async.Implementation, you can
+    pass a thready keyword options to cause each client to get it's own thread.
+
+    >>> import functools, threading, zc.ngi.generator
+
+    >>> @functools.partial(zc.ngi.async.listener, None, thready=True)
+    ... @zc.ngi.generator.handler
+    ... def listener(conn):
+    ...     if 'client' not in threading.current_thread().name:
+    ...         print 'oops'
+    ...     yield
+    >>> addr = listener.address
+
+    So, now we're listening on listener.address, let's connect to it.
+
+    >>> event = threading.Event()
+    >>> class Connect:
+    ...     def __init__(self, name):
+    ...         self.name = name
+    ...         event.clear()
+    ...         zc.ngi.async.connect(addr, self)
+    ...         event.wait(1)
+    ...     def connected(self, connection):
+    ...         globals()[self.name] = connection
+    ...         zc.ngi.testing.PrintingHandler(connection)
+    ...         event.set()
+
+    Initially, we have no client handling threads:
+
+    >>> def count_client_threads():
+    ...     return len([t for t in threading.enumerate()
+    ...                 if ("%r client" % (addr, )) in t.name])
+    >>> count_client_threads()
+    0
+
+    >>> _ = Connect('c1')
+    >>> _ = Connect('c2')
+
+    So now we have 2 connections and we have 2 corresponding threads:
+
+    >>> count_client_threads()
+    2
+
+    If we close the connections and wait a bit, the threads will be cleaned up:
+
+    >>> c1.close()
+    >>> c2.close()
+    >>> time.sleep(.1)
+
+    >>> count_client_threads()
+    0
+
+    Let's create another connection
+
+    >>> _ = Connect('c1')
+    >>> count_client_threads()
+    1
+
+    Now, we'll close the listener and the connection threads will be cleaned up.
+
+    >>> listener.close()
+    >>> time.sleep(.5)
+    -> CLOSE end of input
+
+    >>> count_client_threads()
+    0
+
+    >>> zc.ngi.async.wait(1)
+
+    """
+
+
 if sys.version_info < (2, 6):
     del setHandler_compatibility
 



More information about the checkins mailing list