[Checkins] SVN: zc.ngi/trunk/ Added an experimental listener option to run each client (server
Jim Fulton
jim at zope.com
Wed Jul 14 10:37:25 EDT 2010
Log message for revision 114745:
Added an experimental listener option to run each client (server
connection) in a separate thread.
Changed:
U zc.ngi/trunk/README.txt
U zc.ngi/trunk/src/zc/ngi/async.py
U zc.ngi/trunk/src/zc/ngi/tests.py
-=-
Modified: zc.ngi/trunk/README.txt
===================================================================
--- zc.ngi/trunk/README.txt 2010-07-14 14:24:25 UTC (rev 114744)
+++ zc.ngi/trunk/README.txt 2010-07-14 14:37:25 UTC (rev 114745)
@@ -20,6 +20,15 @@
*******
====================
+2.0.0a2 (2010-07-??)
+====================
+
+New Features:
+
+- There's a new experimental zc.ngi.async.Implementation.listener
+ option to run each client (server connection) in it's own thread.
+
+====================
2.0.0a1 (2010-07-08)
====================
Modified: zc.ngi/trunk/src/zc/ngi/async.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/async.py 2010-07-14 14:24:25 UTC (rev 114744)
+++ zc.ngi/trunk/src/zc/ngi/async.py 2010-07-14 14:37:25 UTC (rev 114745)
@@ -21,6 +21,7 @@
import os
import socket
import sys
+import thread
import threading
import time
import warnings
@@ -57,7 +58,11 @@
self._callbacks = []
self._start_lock = threading.Lock()
+ thread_ident = None
def call_from_thread(self, func):
+ if thread.get_ident() == self.thread_ident:
+ func()
+ return
self._callbacks.append(func)
self.notify_select()
self.start_thread()
@@ -69,8 +74,8 @@
self.call_from_thread(lambda : _Connector(addr, handler, self))
self.start_thread()
- def listener(self, addr, handler):
- result = _Listener(addr, handler, self)
+ def listener(self, addr, handler, thready=False):
+ result = _Listener(addr, handler, self, thready)
self.start_thread()
return result
@@ -93,7 +98,7 @@
return result
_thread = None
- def start_thread(self, daemon=True):
+ def start_thread(self):
with self._start_lock:
if self._thread is None:
self._thread = threading.Thread(
@@ -111,6 +116,7 @@
raise zc.ngi.interfaces.Timeout
def loop(self, timeout=None):
+ self.thread_ident = thread.get_ident()
if timeout is not None:
deadline = time.time() + timeout
else:
@@ -156,6 +162,7 @@
if timeout <= 0:
raise zc.ngi.interfaces.Timeout
finally:
+ del self.thread_ident
del self.notify_select
trigger.close()
@@ -380,9 +387,9 @@
class _ServerConnection(_Connection):
zc.ngi.interfaces.implements(zc.ngi.interfaces.IServerConnection)
- def __init__(self, sock, addr, logger, listener):
+ def __init__(self, sock, addr, logger, listener, implementation):
self.control = listener
- _Connection.__init__(self, sock, addr, logger, listener.implementation)
+ _Connection.__init__(self, sock, addr, logger, implementation)
def close(self):
_Connection.close(self)
@@ -503,8 +510,7 @@
def handle_error(self):
reason = sys.exc_info()[1]
- #self.logger.exception('listener error')
- traceback.print_exception(*sys.exc_info())
+ self.logger.exception('listener error')
self.close()
self.implementation.handle_error()
@@ -513,10 +519,11 @@
logger = logging.getLogger('zc.ngi.async.server')
- def __init__(self, addr, handler, implementation):
+ def __init__(self, addr, handler, implementation, thready):
self.__handler = handler
self.__close_handler = None
- self.__connections = {}
+ self._thready = thready
+ self.__connections = set()
BaseListener.__init__(self, implementation)
if isinstance(addr, str):
family = socket.AF_UNIX
@@ -574,20 +581,31 @@
if __debug__:
self.logger.debug('incoming connection %r', addr)
- connection = _ServerConnection(sock, addr, self.logger, self)
- self.__connections[connection] = 1
- try:
- self.__handler(connection)
- except:
- self.logger.exception("server handler failed")
- self.close()
+ if self._thready:
+ impl = Implementation(name="%r client" % (self.address,))
+ else:
+ impl = self.implementation
+ connection = _ServerConnection(sock, addr, self.logger, self, impl)
+ self.__connections.add(connection)
+ @impl.call_from_thread
+ def _():
+ try:
+ self.__handler(connection)
+ except:
+ self.logger.exception("server handler failed")
+ self.close()
+
+ if impl is not self.implementation:
+ impl.start_thread()
+
+
def connections(self):
return iter(self.__connections)
def closed(self, connection):
if connection in self.__connections:
- del self.__connections[connection]
+ self.__connections.remove(connection)
if not self.__connections and self.__close_handler:
self.__close_handler(self)
Modified: zc.ngi/trunk/src/zc/ngi/tests.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/tests.py 2010-07-14 14:24:25 UTC (rev 114744)
+++ zc.ngi/trunk/src/zc/ngi/tests.py 2010-07-14 14:37:25 UTC (rev 114745)
@@ -479,6 +479,80 @@
"""
+def EXPERIMENTAL_thready_async_servers():
+ r"""
+ When creating a listener with a zc.ngi.async.Implementation, you can
+ pass a thready keyword options to cause each client to get it's own thread.
+
+ >>> import functools, threading, zc.ngi.generator
+
+ >>> @functools.partial(zc.ngi.async.listener, None, thready=True)
+ ... @zc.ngi.generator.handler
+ ... def listener(conn):
+ ... if 'client' not in threading.current_thread().name:
+ ... print 'oops'
+ ... yield
+ >>> addr = listener.address
+
+ So, now we're listening on listener.address, let's connect to it.
+
+ >>> event = threading.Event()
+ >>> class Connect:
+ ... def __init__(self, name):
+ ... self.name = name
+ ... event.clear()
+ ... zc.ngi.async.connect(addr, self)
+ ... event.wait(1)
+ ... def connected(self, connection):
+ ... globals()[self.name] = connection
+ ... zc.ngi.testing.PrintingHandler(connection)
+ ... event.set()
+
+ Initially, we have no client handling threads:
+
+ >>> def count_client_threads():
+ ... return len([t for t in threading.enumerate()
+ ... if ("%r client" % (addr, )) in t.name])
+ >>> count_client_threads()
+ 0
+
+ >>> _ = Connect('c1')
+ >>> _ = Connect('c2')
+
+ So now we have 2 connections and we have 2 corresponding threads:
+
+ >>> count_client_threads()
+ 2
+
+ If we close the connections and wait a bit, the threads will be cleaned up:
+
+ >>> c1.close()
+ >>> c2.close()
+ >>> time.sleep(.1)
+
+ >>> count_client_threads()
+ 0
+
+ Let's create another connection
+
+ >>> _ = Connect('c1')
+ >>> count_client_threads()
+ 1
+
+ Now, we'll close the listener and the connection threads will be cleaned up.
+
+ >>> listener.close()
+ >>> time.sleep(.5)
+ -> CLOSE end of input
+
+ >>> count_client_threads()
+ 0
+
+ >>> zc.ngi.async.wait(1)
+
+ """
+
+
if sys.version_info < (2, 6):
del setHandler_compatibility
More information about the checkins
mailing list