[Checkins] SVN: zc.ngi/trunk/ Refactored ``zc.ngi.async`` thread management to make the blocking
Jim Fulton
jim at zope.com
Mon Jul 5 18:20:59 EDT 2010
Log message for revision 114220:
Refactored ``zc.ngi.async`` thread management to make the blocking
APIs unnecessary. ``zc.ngi.async.blocking`` is now deprecated.
Made it possible to declare interfaces without zope.interface being
present and added interface declarations on the implementations.
Moved ConnectionFailed to tghe interfaces module.
Changed:
U zc.ngi/trunk/README.txt
U zc.ngi/trunk/src/zc/ngi/README.txt
U zc.ngi/trunk/src/zc/ngi/async.py
U zc.ngi/trunk/src/zc/ngi/async.txt
U zc.ngi/trunk/src/zc/ngi/blocking.py
U zc.ngi/trunk/src/zc/ngi/blocking.txt
A zc.ngi/trunk/src/zc/ngi/doc/echo_server.py
U zc.ngi/trunk/src/zc/ngi/doc/index.txt
U zc.ngi/trunk/src/zc/ngi/generator.py
U zc.ngi/trunk/src/zc/ngi/interfaces.py
U zc.ngi/trunk/src/zc/ngi/testing.py
U zc.ngi/trunk/src/zc/ngi/tests.py
-=-
Modified: zc.ngi/trunk/README.txt
===================================================================
--- zc.ngi/trunk/README.txt 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/README.txt 2010-07-05 22:20:58 UTC (rev 114220)
@@ -20,7 +20,7 @@
*******
==================
-1.2.0 (2010-06-??)
+1.2.0 (2010-07-??)
==================
New Features:
@@ -32,9 +32,8 @@
- Cleaner testing interfaces
-- Added a new blocking client request interface,
- ``zc.ngi.blocking.request``. Other older blocking APIs are
- deprecated.
+- Refactored ``zc.ngi.async`` thread management to make the blocking
+ APIs unnecessary. ``zc.ngi.async.blocking`` is now deprecated.
- Added support for running multiple ``async`` implementations in
separate threads. This is useful in applications with fewer network
Modified: zc.ngi/trunk/src/zc/ngi/README.txt
===================================================================
--- zc.ngi/trunk/src/zc/ngi/README.txt 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/src/zc/ngi/README.txt 2010-07-05 22:20:58 UTC (rev 114220)
@@ -1,3 +1,8 @@
+Much of the information here is a bit out of date, especially wrt the
+testing APIs. Testing is now much simpler than it used to be.
+See doc/index.txt.
+
+
=========================
Network Gateway Interface
=========================
Modified: zc.ngi/trunk/src/zc/ngi/async.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/async.py 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/src/zc/ngi/async.py 2010-07-05 22:20:58 UTC (rev 114220)
@@ -13,6 +13,7 @@
##############################################################################
"""Asyncore-based implementation of the NGI
"""
+from __future__ import with_statement
import asyncore
import errno
@@ -21,9 +22,12 @@
import socket
import sys
import threading
-
+import time
import zc.ngi
+import zc.ngi.interfaces
+zc.ngi.interfaces.moduleProvides(zc.ngi.interfaces.IImplementation)
+
pid = os.getpid()
is_win32 = sys.platform == 'win32'
@@ -43,24 +47,31 @@
BUFFER_SIZE = 8*1024
class Implementation:
+ zc.ngi.interfaces.implements(zc.ngi.interfaces.IImplementation)
- def __init__(self):
+ def __init__(self, daemon=True, name='zc.ngi.async application created'):
+ self.name = name
+ self.daemon = daemon
self._map = {}
- self._trigger = _Trigger(self._map)
- self.notify_select = self._trigger.pull_trigger
+ self._callbacks = []
self._start_lock = threading.Lock()
def call_from_thread(self, func):
- self._trigger.callbacks.append(func)
+ self._callbacks.append(func)
self.notify_select()
+ self.start_thread()
+ def notify_select(self):
+ pass
+
def connect(self, addr, handler):
+ self.call_from_thread(lambda : _Connector(addr, handler, self))
self.start_thread()
- self.call_from_thread(lambda : _Connector(addr, handler, self))
def listener(self, addr, handler):
+ result = _Listener(addr, handler, self)
self.start_thread()
- return _Listener(addr, handler, self)
+ return result
def udp(self, address, message):
if isinstance(address, str):
@@ -76,43 +87,103 @@
_udp_socks[family].append(sock)
def udp_listener(self, addr, handler, buffer_size=4096):
+ result = _UDPListener(addr, handler, buffer_size, self)
self.start_thread()
- return _UDPListener(addr, handler, buffer_size, self)
+ return result
_thread = None
- def start_thread(self, daemon=True, name=__name__):
- self._start_lock.acquire()
- try:
+ def start_thread(self, daemon=True):
+ with self._start_lock:
if self._thread is None:
- self._thread = threading.Thread(target=self._loop, name=name)
- self._thread.setDaemon(True)
+ self._thread = threading.Thread(
+ target=self.loop, name=self.name)
+ self._thread.setDaemon(self.daemon)
self._thread.start()
- finally:
- self._start_lock.release()
- def _loop(self):
- timeout = 30.0
+ def wait(self, timeout=None):
+ with self._start_lock:
+ if self._thread is None:
+ return
+ join = self._thread.join
+ join(timeout)
+ if self._thread is not None:
+ raise zc.ngi.interfaces.Timeout
+
+ def loop(self, timeout=None):
+ if timeout is not None:
+ deadline = time.time() + timeout
+ else:
+ deadline = None
+ timeout = 30
map = self._map
+ callbacks = self._callbacks
logger = logging.getLogger('zc.ngi.async.loop')
+ trigger = _Trigger(self._map)
+ self.notify_select = trigger.pull_trigger
- while map:
- try:
- asyncore.poll(timeout, map)
- except:
- traceback.print_exception(*sys.exc_info())
- #logger.exception('loop error')
- raise
+ try:
+ while 1:
+ while callbacks:
+ callback = callbacks.pop(0)
+ try:
+ callback()
+ except:
+ self.logger.exception('Calling callback')
+ self.handle_error()
+
+ if deadline:
+ timeout = min(deadline - time.time(), 30)
+
+ try:
+ if (timeout > 0) and (len(map) > 1):
+ asyncore.poll(timeout, map)
+ except:
+ logger.exception('loop error')
+ raise
+
+ if trigger._fileno is None:
+ # oops, the trigger got closed. Recreate it.
+ trigger = _Trigger(self._map)
+ self.notify_select = trigger.pull_trigger
+
+ with self._start_lock:
+ if (len(map) <= 1) and not callbacks:
+ self._thread = None
+ return
+
+ if timeout <= 0:
+ raise zc.ngi.interfaces.Timeout
+ finally:
+ del self.notify_select
+ trigger.close()
+
def cleanup_map(self):
for c in self._map.values():
if isinstance(c, _Trigger):
continue
- try:
- del self._map[c.fileno()]
- except KeyError:
- pass
c.close()
+ for c in self._map.values():
+ if isinstance(c, _Trigger):
+ continue
+ c.close()
+ def handle_error(self):
+ pass
+
+class Inline(Implementation):
+ """Run in an application thread, rather than a separate thread.
+ """
+
+ def start_thread(self):
+ pass
+
+ def handle_error(self):
+ raise
+
+ def wait(self, *args):
+ self.loop(*args)
+
class dispatcher(asyncore.dispatcher):
def __init__(self, sock, addr, implementation):
@@ -130,6 +201,7 @@
"Exception raised by dispatcher handle_close(%r)",
reason)
self.close()
+ self.implementation.handle_error()
def close(self):
self.del_channel(self._map)
@@ -139,9 +211,8 @@
return False
class _Connection(dispatcher):
+ zc.ngi.interfaces.implements(zc.ngi.interfaces.IConnection)
- control = None
-
def __init__(self, sock, addr, logger, implementation):
self.__connected = True
self.__closed = None
@@ -194,8 +265,6 @@
self.__connected = False
self.__output[:] = []
dispatcher.close(self)
- if self.control is not None:
- self.control.closed(self)
self.implementation.notify_select()
def readable(self):
@@ -298,6 +367,19 @@
def handle_expt(self):
self.handle_close('socket error')
+
+class _ServerConnection(_Connection):
+ zc.ngi.interfaces.implements(zc.ngi.interfaces.IServerConnection)
+
+ def __init__(self, sock, addr, logger, listener):
+ self.control = listener
+ _Connection.__init__(self, sock, addr, logger, listener.implementation)
+
+ def close(self):
+ _Connection.close(self)
+ self.control.closed(self)
+
+
class _Connector(dispatcher):
logger = logging.getLogger('zc.ngi.async.client')
@@ -347,10 +429,13 @@
if __debug__:
self.logger.debug('connector close %r', reason)
try:
- self.__handler.failed_connect(reason)
- except:
- self.logger.exception("failed_connect(%r) failed", reason)
- self.close()
+ try:
+ self.__handler.failed_connect(reason)
+ except:
+ self.logger.exception("failed_connect(%r) failed", reason)
+ self.implementation.handle_error()
+ finally:
+ self.close()
def handle_write_event(self):
err = self.socket.connect_ex(self.addr)
@@ -386,12 +471,17 @@
"Handler failed_connect(%s) raised an exception", reason,
)
self.close()
+ self.implementation.handle_error()
def handle_expt(self):
self.handle_close('connection failed')
class BaseListener(asyncore.dispatcher):
+ def __init__(self, implementation):
+ self.implementation = implementation
+ asyncore.dispatcher.__init__(self, map=implementation._map)
+
def writable(self):
return False
@@ -407,18 +497,18 @@
#self.logger.exception('listener error')
traceback.print_exception(*sys.exc_info())
self.close()
+ self.implementation.handle_error()
class _Listener(BaseListener):
+ zc.ngi.interfaces.implements(zc.ngi.interfaces.IListener)
logger = logging.getLogger('zc.ngi.async.server')
def __init__(self, addr, handler, implementation):
- self.implementation = implementation
- map = implementation._map
self.__handler = handler
self.__close_handler = None
self.__connections = {}
- asyncore.dispatcher.__init__(self, map=map)
+ BaseListener.__init__(self, implementation)
if isinstance(addr, str):
family = socket.AF_UNIX
else:
@@ -453,7 +543,7 @@
self.logger.warn("unable to listen on %r", addr)
raise
- self.add_channel(map)
+ self.add_channel(self._map)
self.address = addr
self.implementation.notify_select()
@@ -475,9 +565,8 @@
if __debug__:
self.logger.debug('incoming connection %r', addr)
- connection = _Connection(sock, addr, self.logger, self.implementation)
+ connection = _ServerConnection(sock, addr, self.logger, self)
self.__connections[connection] = 1
- connection.control = self
try:
self.__handler(connection)
except:
@@ -493,10 +582,8 @@
if not self.__connections and self.__close_handler:
self.__close_handler(self)
- def close(self, handler=None):
- self.accepting = False
- self.del_channel(self._map)
- self.implementation.call_from_thread(self.socket.close)
+ def _close(self, handler):
+ BaseListener.close(self)
if handler is None:
for c in list(self.__connections):
c.handle_close("stopped")
@@ -505,6 +592,15 @@
else:
self.__close_handler = handler
+ def close(self, handler=None):
+ self.accepting = False
+ self.implementation.call_from_thread(lambda : self._close(handler))
+
+ def close_wait(self, timeout=None):
+ event = threading.Event()
+ self.close(lambda _: event.set())
+ event.wait(timeout)
+
# convenience method made possible by storing our address:
def connect(self, handler):
self.implementation.connect(self.address, handler)
@@ -515,11 +611,9 @@
connected = True
def __init__(self, addr, handler, buffer_size, implementation):
- self.implementation = implementation
- map = implementation._map
self.__handler = handler
self.__buffer_size = buffer_size
- asyncore.dispatcher.__init__(self, map=map)
+ BaseListener.__init__(self, implementation)
if isinstance(addr, str):
family = socket.AF_UNIX
else:
@@ -534,7 +628,7 @@
self.close()
self.logger.warn("unable to listen on udp %r", addr)
raise
- self.add_channel(map)
+ self.add_channel(self._map)
self.implementation.notify_select()
def handle_read(self):
@@ -559,9 +653,6 @@
logger = logging.getLogger('zc.ngi.async.trigger')
- def __init__(self):
- self.callbacks = []
-
def writable(self):
return 0
@@ -573,13 +664,6 @@
self.close()
def handle_read(self):
- while self.callbacks:
- callback = self.callbacks.pop(0)
- try:
- callback()
- except:
- self.logger.exception('Calling callback')
-
try:
self.recv(BUFFER_SIZE)
except socket.error:
@@ -589,7 +673,6 @@
class _Trigger(_Triggerbase, asyncore.file_dispatcher):
def __init__(self, map):
- _Triggerbase.__init__(self)
r, self.__writefd = os.pipe()
asyncore.file_dispatcher.__init__(self, r, map)
@@ -619,9 +702,6 @@
class _Trigger(_Triggerbase, asyncore.dispatcher):
def __init__(self, map):
- _Triggerbase.__init__(self)
- _Triggerbase.__init__(self)
-
# Get a pair of connected sockets. The trigger is the 'w'
# end of the pair, which is connected to 'r'. 'r' is put
# in the asyncore socket map. "pulling the trigger" then
@@ -685,7 +765,7 @@
self.logger.debug('notify select %s', pid)
self.trigger.send('x')
-_select_implementation = Implementation()
+_select_implementation = Implementation(name=__name__)
call_from_thread = _select_implementation.call_from_thread
connect = connector = _select_implementation.connect
@@ -695,3 +775,6 @@
udp_listener = _select_implementation.udp_listener
_map = _select_implementation._map
cleanup_map = _select_implementation.cleanup_map
+wait = _select_implementation.wait
+
+main = Inline()
Modified: zc.ngi/trunk/src/zc/ngi/async.txt
===================================================================
--- zc.ngi/trunk/src/zc/ngi/async.txt 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/src/zc/ngi/async.txt 2010-07-05 22:20:58 UTC (rev 114220)
@@ -242,5 +242,9 @@
unable to listen on ('127.0.0.1', 9645)
>>> listener.close()
- >>> time.sleep(0.1)
+ >>> zc.ngi.async.cleanup_map()
+ >>> zc.ngi.async.wait(1)
>>> loghandler.uninstall()
+
+.. cleanup
+
Modified: zc.ngi/trunk/src/zc/ngi/blocking.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/blocking.py 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/src/zc/ngi/blocking.py 2010-07-05 22:20:58 UTC (rev 114220)
@@ -11,15 +11,12 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
+from zc.ngi.interfaces import ConnectionFailed
import sys
import threading
import time
import zc.ngi
-class ConnectionFailed(Exception):
- """A Connection attempt failed
- """
-
class Timeout(Exception):
"""An operation timed out.
"""
Modified: zc.ngi/trunk/src/zc/ngi/blocking.txt
===================================================================
--- zc.ngi/trunk/src/zc/ngi/blocking.txt 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/src/zc/ngi/blocking.txt 2010-07-05 22:20:58 UTC (rev 114220)
@@ -1,3 +1,5 @@
+The blocking module is deprecated.
+
=======================
Blocking network access
=======================
@@ -154,3 +156,39 @@
>>> 0.5 <= (time.time() - then) < 1
True
+
+
+Blocking Client Requests
+========================
+
+ >>> import zc.ngi.generator
+ >>> @zc.ngi.generator.handler
+ ... def server(c):
+ ... while 1:
+ ... c.write((yield).upper())
+
+ >>> import zc.ngi.adapters
+ >>> @zc.ngi.adapters.Lines.handler
+ ... def client(c):
+ ... c.write('ho world\n')
+ ... print (yield)
+
+ >>> import zc.ngi.async
+ >>> address = 'ngiexample.zope.com', 9000
+ >>> zc.ngi.blocking.request(zc.ngi.async.connect, address, client)
+ ... # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ ConnectionFailed: ...
+
+The connection above failed because there wasn't a listener.
+Let's try after starting a listener:
+
+ >>> listener = zc.ngi.async.listener(None, server)
+ >>> address = listener.address
+
+ >>> zc.ngi.blocking.request(zc.ngi.async.connect, address, client)
+ HO WORLD
+
+ >>> listener.close()
+ >>> zc.ngi.async.wait(1)
Added: zc.ngi/trunk/src/zc/ngi/doc/echo_server.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/doc/echo_server.py (rev 0)
+++ zc.ngi/trunk/src/zc/ngi/doc/echo_server.py 2010-07-05 22:20:58 UTC (rev 114220)
@@ -0,0 +1,31 @@
+import logging, os, sys, zc.ngi.async
+
+logging.basicConfig()
+
+class Echo:
+
+ def handle_input(self, connection, data):
+ import time; time.sleep(9)
+ connection.write(data.upper())
+
+ def handle_close(self, connection, reason):
+ print 'closed', reason
+
+ def handle_exception(self, connection, exception):
+ print 'oops', exception
+
+def echo_server(connection):
+ connection.setHandler(Echo())
+
+def main(args=None):
+ if args is None:
+ args = sys.argv[1:]
+ address, = args
+ if ':' in address:
+ host, port = address.split(':')
+ address = host, int(port)
+ listener = zc.ngi.async.main.listener(address, echo_server)
+ zc.ngi.async.main.loop()
+
+if __name__ == '__main__':
+ sys.exit(main())
Property changes on: zc.ngi/trunk/src/zc/ngi/doc/echo_server.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Modified: zc.ngi/trunk/src/zc/ngi/doc/index.txt
===================================================================
--- zc.ngi/trunk/src/zc/ngi/doc/index.txt 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/src/zc/ngi/doc/index.txt 2010-07-05 22:20:58 UTC (rev 114220)
@@ -1,4 +1,3 @@
-
Network Gateway Interface
=========================
@@ -32,9 +31,9 @@
implementation supports testing application code without making
network calls.
-NGI defines 2 groups of interfaces, application and implementation.
+NGI defines 2 groups of interfaces: application and implementation.
Application interfaces are implemented by people writing applications
-using NGI.
+and application-level libraries calling implemenytion interfaces.
NGI is primarily an asynchronous event-driven networking library. Applications
provide handlers that respond to network events. The application
@@ -129,118 +128,89 @@
>>> handler.handle_close(connection, 'done')
closed done
-Imperative handlers using generators
-------------------------------------
+Implementing servers
+====================
-Let's look at a slightly more complicated example. We'll implement
-a simple word-count server connection handler that implements something
-akin to the Unix ``wc`` command. It takes a line of input
-containing a text length followed by length bytes of data. After
-receiving the length bytes of data, it sends back a line of data
-containing line and word counts::
+Implementing servers is only slightly more involved that implementing
+connection handlers. A server is just a callable that takes a
+connection and gives it a handler. For example, we can use a simple
+function to implement a server for the Echo handler::
- class WC:
+ def echo_server(connection):
+ connection.setHandler(Echo())
- input = ''
- count = None
-
- def handle_input(self, connection, data):
- self.input += data
-
- if self.count is None:
- if '\n' not in self.input:
- return
- count, self.input = self.input.split('\n', 1)
- self.count = int(count)
-
- if len(self.input) < self.count:
- return
-
- data = self.input[:self.count]
- self.input = self.input[self.count:]
- self.count = None
- connection.write(
- '%d %d\n' % (len(data.split('\n')), len(data.split())))
-
.. -> src
>>> exec(src)
- >>> handler = WC()
- >>> connection = zc.ngi.testing.Connection()
- >>> handler.handle_input(connection, '15')
- >>> handler.handle_input(connection, '\nhello out\nthere')
- -> '2 3\n'
+Listening for connections
+-------------------------
-Here, we omitted the optional ``handle_close`` and ``handle_exception``
-methods. The implementation is a bit complicated. We have to use
-instance variables to keep track of state between calls. Note that we
-can't count on data coming in a line at a time or make any assumptions
-about the amount of data we'll receive in a ``handle_input`` call.
-The logic is further complicated by the fact that we have two modes of
-collecting input. In the first mode, we're collecting a length. In the
-second mode, we're collecting input for analysis.
+Finally, we have to listen for connections on an address by calling an
+implementation's ``listener`` method. NGI comes with 2 implementation
+modules [#twistedimplementations]_. The ``zc.ngi.testing`` module
+provides an implementation for testing applications.
-Connection handlers can often be simplified by writing them as
-generators, using the ``zc.ngi.generator.handler`` decorator::
+The ``zc.ngi.async`` module provides a collection of implementations
+based on the ``asyncore`` module from the Python standard library.
+These implentations differ based on the way they handle threads.
+Perhaps the simplest of these is the ``zc.ngi.async.main``
+implementation::
- import zc.ngi.generator
+ import zc.ngi.async
- @zc.ngi.generator.handler
- def wc(connection):
- input = ''
- while 1:
- while '\n' not in input:
- input += (yield)
- count, input = input.split('\n', 1)
- count = int(count)
- while len(input) < count:
- input += (yield)
- data = input[:count]
- connection.write(
- '%d %d\n' % (len(data.split('\n')), len(data.split())))
- input = input[count:]
+ address = 'localhost', 8000
+ listener = zc.ngi.async.main.listener(address, echo_server)
+ zc.ngi.async.main.loop()
.. -> src
+ >>> src = src.replace("'localhost', 8000", "None") # pick addr
+ >>> src = src.replace("zc.ngi.async.main.loop()", "")
+ >>> src += """
+ ... import threading
+ ... thread = threading.Thread(target=zc.ngi.async.main.loop)
+ ... thread.setDaemon(True)
+ ... thread.start()
+ ... """
>>> exec(src)
+ >>> import zc.ngi.adapters
+ >>> @zc.ngi.adapters.Lines.handler
+ ... def one(c):
+ ... c.write('one\n')
+ ... print (yield)
+ >>> zc.ngi.async.connect(listener.address, one); zc.ngi.async.wait(1)
+ ONE
+ closed end of input
-The generator takes a connection object and gets data via ``yield``
-statements. The yield statements can raise exceptions. In
-particular, a ``GeneratorExit`` exception is raised when the connection is
-closed. The ``yield`` statement will also (re)raise any exceptions raised
-when calling an iterator passed to ``writelines``.
+ >>> listener.close()
-A generator-based handler is instantiated by calling it with a
-connection object::
+ >>> thread.join()
+ >>> del thread, one
- >>> handler = wc(connection)
- >>> handler.handle_input(connection, '15')
- >>> handler.handle_input(connection, '\nhello out\nthere')
- -> '2 3\n'
+In this example, we listen for connections to our echo server on port
+8000. The listener method returns a listener object. We'll say more
+about these objects in a little bit. We then call the
+``zc.ngi.async.main.loop`` method, which blocks until either:
- >>> handler.handle_close(connection, 'done')
+- A handler raises an exception, or
-Implementing servers
-====================
+- there are no active handlers.
-Implementing servers is only slightly more involved that implementing
-connection handlers. A server is just a callable that takes a
-connection and gives it a handler. For example, we can use a simple
-function to implement a server for the Echo handler::
+I encourage you to try the above example. Write a script that contains
+the ``Echo`` and ``echo_server`` implementations and that calls
+``zc.ngi.async.main.listener`` and ``zc.ngi.async.main.main`` as shown
+above. Run the script in a shell/terminal window and, in a separate
+window, telnet to your server and type some text.
- def echo_server(connection):
- connection.setHandler(Echo())
+Implementing servers as connection handler classes
+--------------------------------------------------
-.. -> src
+It's often simplest to implement a server using a connection handler
+class that takes a connection in it's constructor:
- >>> exec(src)
+ class EchoServer:
-It's usually simpler to just use a connection handler class as a
-server by calling setHandler in the constructor::
-
- class Echo:
-
def __init__(self, connection):
connection.setHandler(self)
@@ -257,71 +227,70 @@
>>> exec(src)
-In this case, the class is a server. It's instances are connection
-handlers.
+ >>> handler = EchoServer(connection)
+ >>> connection.peer.write('Hi world')
+ -> 'HI WORLD'
+ >>> connection.peer.close()
+ closed closed
-Handlers created from generators can be used as servers directly.
+Remember a server is just a callable that takes a connection and
+sets its handler.
-Listening for connections
--------------------------
+Testing servers
+---------------
-Finally, we have to listen for connections on an address by calling an
-implementation's ``listener`` method. NGI comes with 2 implementations
-[#twistedimplementations]_, an implementation based on the ``asyncore``
-module from the standard library, ``zc.ngi.async``, and a testing
-implementation, ``zc.ngi.testing``. To listen for network
-connections on ``localhost`` port ``8000```, we'd use::
+When testing servers, we'll often use the
+``zc.ngi.testing.listener`` function::
- >>> import zc.ngi.async
+ >>> listener = zc.ngi.testing.listener(address, EchoServer)
- >>> address = 'localhost', 8000
- >>> listener = zc.ngi.async.listener(address, Echo)
+Generally, the address will either be a host/port tuple or the name of
+a Unix domain socket, although an implementation may define a custom
+address representation. The ``zc.ngi.testing.listener`` function will
+take any hashable address object.
-.. -> src
+We can connect to a *testing* listener using its connect method::
- Serious hi jinks here. :) The above doctest only *looks* like a
- doctest. We actually turn it into Python code and exec it below.
- We do this so we can replace the code that sets address to set it
- to None so that the listener will just pick an available address.
+ >>> connection = listener.connect()
- >>> exec(src.replace('>>> ', '').replace("'localhost', 8000", 'None'))
+The connection returned from ``listener.connect`` is not the connection
+passed to the server. Instead, it's a test connection that we can use
+as if we're writing a client::
- >>> import logging, sys
- >>> loghandler = logging.StreamHandler(sys.stdout)
- >>> logging.getLogger('zc.ngi').addHandler(loghandler)
- >>> logging.getLogger('zc.ngi').setLevel(logging.ERROR)
+ >>> connection.write('Hi\nthere.')
+ -> 'HI\nTHERE.'
- Echo's ``handle_close`` is problematic when using async, due to timing
- uncertainty.
+It is actually a peer of the connection passed to the server. Testing
+connections have peer attributes that you can use to get to the peer
+connection::
- >>> Echo.handle_close = lambda *args: None
+ >>> connection.peer.peer is connection
+ True
+ >>> list(listener.connections()) == [connection.peer]
+ True
- >>> class EC:
- ... def connected(self, connection):
- ... connection.setHandler(self)
- ... connection.write('test data')
- ... input = ''
+The test connection has a default handler that just prints data to
+standard output, but we can call ``setHandler`` on it to use a different
+handler::
+
+ >>> class Handler:
... def handle_input(self, connection, data):
- ... self.input += data
- ... if self.input == 'TEST DATA':
- ... print self.input
- ... connection.close()
+ ... print 'got', `data`
+ >>> connection.setHandler(Handler())
+ >>> connection.write('take this')
+ got 'TAKE THIS'
- >>> import zc.ngi.blocking, time
- >>> address = listener.address
+Now, the data sent back from the server is handled by our custom
+handler, rather than the default one.
- We need the ``time.sleep`` call to give the server time to
- get its connection closed.
+.. cleanup
- >>> zc.ngi.blocking.request(
- ... zc.ngi.async.connect, address, EC(), 3); time.sleep(.1)
- TEST DATA
+ >>> listener.close()
+ closed stopped
+Listener objects
+----------------
-The listener call immediately returns a listener object. The
-servicing of requests is done in a separate daemon thread provided by
-``zc.ngi.async``.
-
Listener objects, returned by an implementation's ``listener`` method,
provide methods for controlling listeners. The connections method
returns an iterable of open connections to a server::
@@ -333,95 +302,238 @@
>>> listener.close()
-.. test it
- >>> time.sleep(.1)
- >>> zc.ngi.blocking.request(zc.ngi.async.connect, address, EC)
- ... # doctest: +ELLIPSIS
- Traceback (most recent call last):
- ...
- ConnectionFailed: ...
-
.. XXX Future
There's also a ``close_wait`` method that stops listening and waits
for a given period of time for clients to finish on their own before
closing them.
-NGI doesn't keep the main thread running
-----------------------------------------
-An important thing to note about NGI is that it doesn't provide
-support for maintaining the main application thread. The threads it
-creates for itself are "daemon" threads, meaning they don't keep an
-application running when the main thread exits. If a main program
-ended with an implementation's listener call. the program would likely
-exit before the listener had a chance to get and service any
-connections.
+Threading
+=========
-It's up to you to keep an application running. Some frameworks provide
-a ``loop_forever`` call. The closest thing in NGI is::
+NGI tries to accommodate threaded applications without imposing
+thread-safety requirements.
- import threading
- event = Threading.Event()
- event.wait()
+- Implementation (``IImplementation``) methods ``connect``, ``listener``,
+ ``udp`` and ``udp_listener`` are thread safe. They may be called at
+ any time by any thread.
-If you wanted to provide a way to gracefully shut down an application,
-you'd provide some communication channel, such as a signal handler,
-that closed any listeners and then set the event blocking the main
-thread from exiting.
+- Connection (``IConnection``) methods ``write``, ``writelines``, and
+ ``close`` are thread safe. They may be called at
+ any time by any thread.
-Testing servers
----------------
+ The connection setHandler method must only be called in a connect
+ handler's ``connected`` method or a connection handler's
+ ``handle_input`` method.
-When testing servers, we'll often use the
-``zc.ngi.testing.listener`` function::
+- Listener (``IListener``) methods ``connections`` and ``close`` are
+ thread safe. They may be called at
+ any time by any thread.
- >>> listener = zc.ngi.testing.listener(address, Echo)
+- Application handler methods need not be thread safe. NGI
+ implementations will never call them from more than one thread at a
+ time.
-Generally, the address will either be a host/port tuple or the name of
-a Unix domain socket, although an implementation may define a custom
-address representation. The ``zc.ngi.testing.listener`` function will
-take any hashable address object.
+``zc.ngi.async`` implementations and threading
+----------------------------------------------
-We can connect to a *testing* listener using its connect method::
+The ``zc.ngi.async`` module provides a number of threading models. The
+``zc.ngi.async`` module works by running one or more "loops".
+These loops wait for networking events and call application handlers.
- >>> connection = listener.connect()
+One application-controlled main loop
+ In this model, the application is responsible for calling
+ ``zc.ngi.async.main.loop``, typically from an application's main
+ thread. This is most appropriate for simple single-threaded
+ applications that do nothing but respond to application events.
-The connection returned from ``listener.connect`` is not the connection
-passed to the server. Instead, it's a test connection that we can use
-as if we're writing a client::
+ The ``loop`` call blocks until an exception is raised by a handler or
+ until there are no more handlers registered with the implementation.
- >>> connection.write('Hi\nthere.')
- -> 'HI\nTHERE.'
+One ``zc.ngi.async``-controlled loop In this model, the
+ ``zc.ngi.async`` module maintains its own loop thread. This is
+ the default implementation, provided by the module itself. It is
+ appropriate when implementingh libraries that perform networking
+ to perform their function. The advantage of this approach is that
+ it is less intrusive to applications. The loop thread is managed
+ automatically.
-It is actually a peer of the connection passed to the server. Testing
-connections have peer attributes that you can use to get to the peer
-connection::
+ Note that the thread used by ``zc.ngi.async`` is "daemonic", meaning
+ that if the main program thread exits, the ``zc.ngi.async`` thread
+ won't keep the program running. If a program registers handlers with
+ the ``zc.ngi.async`` implementation and then exists, the program will
+ exit without the handlers being called. If the application doesn't
+ have other work to do, it should use ``zc.ngi.async.main`` or take
+ other steps to keep the application running.
- >>> connection.peer.peer is connection
- True
- >>> list(listener.connections()) == [connection.peer]
- True
+Multiple ``zc.ngi.async`` implementations and implementation-managed threads
+ You can instantiate ``zc.ngi.async.Implementation`` objects, which
+ provide the :class:`~zc.ngi.interfaces.IImplementation` interface and
+ each have their own networking loop, running in a separate thread.
+ For example, if you have an application that has multiple network
+ servers or multiple long-lived clients, it can be desireable to run
+ each using it's own implementation.
-The test connection has a default handler that just prints data to
-standard output, but we can call ``setHandler`` on it to use a different
-handler::
+Multiple ``zc.ngi.async`` implementations and application-managed threads
+ You can instantiate ``zc.ngi.async.Inline`` objects, which provide
+ the :class:`~zc.ngi.interfaces.IImplementation` interface and have
+ a blocking loop method that you must call yourself. Use this
+ implementation class to manage threads yourself. The loop method
+ returns when an exceptiuon is raised by a handler or when there
+ are no handlers registered with the implementation.
+ ``zc.ngi.async.main`` is a ``zc.ngi.async.Inline`` instance.
- >>> class Handler:
- ... def handle_input(self, connection, data):
- ... print 'got', `data`
- >>> connection.setHandler(Handler())
- >>> connection.write('take this')
- got 'TAKE THIS'
+Performance issues with a single loop
+-------------------------------------
-Now, the data sent back from the server is handled by our custom
-handler, rather than the default one.
+With a single loop, all networking activity is done in one thread.
+If a handler takes a long time to perform some function, it can
+prevent other networking activity from propceeding. For this reason,
+when a single loop is used, it's important that handlers perform their
+work quickly, without blocking for any length of time.
-.. cleanup
+If a loop is only servicing a single handler, or a small number of
+handlers, it's not a problem if a handler takes along time to respond
+to a network event.
- >>> listener.close()
+If you need to do a lot of work in response to network events,
+consider using multiple loops, or using thread pools (or
+multipprocessing pools) connected to your handlers with queues.
+Threads are heavier than handlers
+---------------------------------
+
+If you're going to be dealing with lots of network connections, it's
+probably better to use a single loop (or few loops) and use
+non-blocking handlers. Many non-blocking handlers can be efficiently
+managed at once. Compared to handlers, threads are relatively heavy
+weight, with large memory requirements and relatively long start-up times.
+
+Imperative handlers using generators
+====================================
+
+We saw earlier that we implemented connection handlers by implementing
+the :class:`~zc.ngi.interfaces.IConnectionHandler` in a class that
+provided, at a minmimum, a ``handle_input`` method. This is pretty
+straightforward. The ``handle_input`` method simply reacts to input data.
+Unfortunately, for many applications, this can make applicatiomn logic
+harder to express. Sometimes, a more imperative style leads to
+simpler applicatiuon logic.
+
+Let's look at an example. We'll implement a simple word-count server
+connection handler that implements something akin to the Unix ``wc``
+command. It takes a line of input containing a text length followed
+by length bytes of data. After receiving the length bytes of data, it
+sends back a line of data containing line and word counts::
+
+ class WC:
+
+ input = ''
+ count = None
+
+ def handle_input(self, connection, data):
+ self.input += data
+
+ if self.count is None:
+ if '\n' not in self.input:
+ return
+ count, self.input = self.input.split('\n', 1)
+ self.count = int(count)
+
+ if len(self.input) < self.count:
+ return
+
+ data = self.input[:self.count]
+ self.input = self.input[self.count:]
+ self.count = None
+ connection.write(
+ '%d %d\n' % (len(data.split('\n')), len(data.split())))
+
+.. -> src
+
+ >>> exec(src)
+
+ >>> handler = WC()
+ >>> connection = zc.ngi.testing.Connection()
+ >>> handler.handle_input(connection, '15')
+ >>> handler.handle_input(connection, '\nhello out\nthere')
+ -> '2 3\n'
+
+Here, we omitted the optional ``handle_close`` and ``handle_exception``
+methods. The implementation is a bit complicated. We have to use
+instance variables to keep track of state between calls. Note that we
+can't count on data coming in a line at a time or make any assumptions
+about the amount of data we'll receive in a ``handle_input`` call.
+The logic is further complicated by the fact that we have two modes of
+collecting input. In the first mode, we're collecting a length. In the
+second mode, we're collecting input for analysis.
+
+Connection handlers can often be simplified by writing them as
+generators, using the ``zc.ngi.generator.handler`` decorator::
+
+ import zc.ngi.generator
+
+ @zc.ngi.generator.handler
+ def wc(connection):
+ input = ''
+ while 1:
+ while '\n' not in input:
+ input += (yield)
+ count, input = input.split('\n', 1)
+ count = int(count)
+ while len(input) < count:
+ input += (yield)
+ data = input[:count]
+ connection.write(
+ '%d %d\n' % (len(data.split('\n')), len(data.split())))
+ input = input[count:]
+
+.. -> src
+
+ >>> exec(src)
+
+The generator takes a connection object and gets data via ``yield``
+expressions. The yield expressions can raise exceptions. In
+particular, a ``GeneratorExit`` exception is raised when the
+connection is closed by the connection peer. The ``yield`` statement
+will also (re)raise any exceptions raised when calling an iterator
+passed to ``writelines``.
+
+A generator-based handler is instantiated by calling it with a
+connection object::
+
+ >>> handler = wc(connection)
+ >>> handler.handle_input(connection, '15')
+ >>> handler.handle_input(connection, '\nhello out\nthere')
+ -> '2 3\n'
+
+ >>> handler.handle_close(connection, 'done')
+
+There are a number of things to note about generator-based handlers:
+
+- The logic is expressed imperatively. We don't have to keep track of
+ what mode we're in. We progress naturally from one mode to another
+ as we progress through the generator function logic.
+
+- A handler is implemented as a function, rather than a class.
+
+- The ``generator`` decorator creates an object that, when called with
+ a connection, returns an object that implements the full
+ :class:`~zc.ngi.interfaces.IConnectionHandler` interface. The
+ optional methods are handled by throwing exceptions to the generator
+ function. A generator function can handle these events by
+ providingh exception handlers.
+
+- The ``generator`` decorator creates an object that implements
+ :class:`~zc.ngi.interfaces.IServer` and can be used as a server.
+
+- The ``generator`` decorator creates an object that minimally implements
+ :class:`~zc.ngi.interfaces.IClientConnectHandler` and can be used as
+ a client connectioon handler, as described later.
+
+
Implementing clients
====================
@@ -435,7 +547,7 @@
is called if it fails.
Let's implement a word-count client. It will take a string and use a
-work-count server to get its line and word counts::
+word-count server to get its line and word counts::
class WCClient:
@@ -559,6 +671,13 @@
<BLANKLINE>
-> CLOSE
+A generator can also be used as a client connect handler. The
+``failed_connect`` method provided by a generator handler simply
+raises an exception. For this reason, generator handlers are
+generally only appropriate in ad hoc situations, like simple client
+scripts, typically using ``zc.ngi.async.main``, where exceptions are
+propigated to the ``zc.ngi.async.main.loop`` call.
+
Connecting
----------
@@ -718,96 +837,6 @@
For address, ('', 8000), a connect handler called connect from a
failed_connect call.
-Connectors return immediately
------------------------------
-
-An important thing to note about making connections is that connector
-calls return immediately. Connections are made and connection
-handlers are called in separate threads. This means that you can have
-many outstanding connect requests active at once. It also means that,
-as with servers, it is your responsibility to keep client programs
-running while handlers are doing their work.
-
-Blocking Client Requests
-------------------------
-
-Event-based APIs can be very convenient when implementing servers,
-and sometimes even when implementing clients. In many cases though,
-simple clients can be problematic because, as mentioned in the
-previous section, calls to connectors are made in a separate thread. A
-call to an implementation's ``connect`` method returns immediately,
-before a connection is made and handled. A simple script that makes a
-single request to a server has to wait for a request to be completed
-before exiting.
-
-To support the common use case of a client that makes a single request
-(or small finite number of requests) to a server, the
-``zc.ngi.blocking`` module provides a ``request`` function that makes
-a single request and blocks until the request has completed. The
-request function takes a connector, an address, and a connect
-handler. In the example above, we used the ``zc.ngi.async``
-implementation's ``connect`` function as the connector.
-
-If the connection fails, an exeption is raised::
-
- >>> import zc.ngi.blocking
- >>> zc.ngi.blocking.request(zc.ngi.async.connect, address, WCClient)
- ... # doctest: +ELLIPSIS
- Traceback (most recent call last):
- ...
- ConnectionFailed: ...
-
-The connection above failed because there wasn't a listener.
-Let's try after starting a listener:
-
-.. let the listener pick the address below:
-
- >>> address = None
-
-::
-
- >>> listener = zc.ngi.async.listener(address, wc)
-
-.. use the listener's address
-
- >>> address = listener.address
-
-::
-
- >>> zc.ngi.blocking.request(zc.ngi.async.connect, address, WCClient('xxx'))
- WCClient got 1 1
- <BLANKLINE>
-
-You can also pass a connection handler or a generator handler to
-``zc.ngi.blocking.request``::
-
- >>> @zc.ngi.generator.handler
- ... def client(connection):
- ... data = "hello\nworld.\n"
- ... connection.write("%s\n%s" % (len(data), data))
- ... input = ''
- ... while '\n' not in input:
- ... input += (yield)
- ... print 'Got', input
-
- >>> zc.ngi.blocking.request(zc.ngi.async.connect, address, client)
- Got 3 2
- <BLANKLINE>
-
-.. cleanup
-
- >>> listener.close()
-
-The ``zc.ngi.blocking`` module has some other APIs for writing
-blocking network programs in an imperative style. These were written
-before ``zc.ngi.generator`` and ``zc.ngi.blocking.request``. Now
-``zc.ngi.generator`` allows handlers to be written in an imperative
-style without giving up the advantages, especially for testing, of
-reactive handlers. The ``zc.ngi.blocking.request`` function now
-makes it easy for simple client programs to wait for requests to
-complete. For these reasons, the older blocking APIs are now
-deprecated.
-
Connection Adapters
===================
@@ -950,6 +979,116 @@
Here we've defined a defined a generator-based adapter that uses the
``Lines`` adapter.
+Blocking client scripts
+=======================
+
+You may need to make a few networking requests in a script. You
+typically want to make the requests, block until they're done, and
+then go on about your business. The ``zc.ngi.async`` implementations
+provide a ``wait`` method that can be used in this sitation. The
+`wait`` method blocks until there are no outstanding requests, or
+until an optional timeout has passed.
+
+For example, suppose a word-count server is running on an address. We
+can use the following script to get the word counts for a set of
+strings::
+
+ result = []
+
+ def get_word_count(s):
+
+ @zc.ngi.adapters.Lines.handler
+ def getwc(connection):
+ connection.write("%s\n" % len(s))
+ connection.write(s)
+ result.append((yield))
+
+ zc.ngi.async.main.connect(address, getwc)
+
+ for s in 'Hello\nworld\n', 'hi\n':
+ get_word_count(s)
+
+ zc.ngi.async.main.wait(10)
+
+ print sorted(result)
+
+.. -> src
+
+ >>> src = src.replace("10", ".2") # don't wait so long
+
+ w/o timeout:
+
+ >>> listener = zc.ngi.async.listener(None, wc)
+ >>> address = listener.address
+ >>> exec(src)
+ ['2 1', '3 2']
+
+ >>> listener.close()
+
+ w timeout:
+
+ >>> import time
+ >>> @zc.ngi.generator.handler
+ ... def echo_slowly(c):
+ ... s = (yield)
+ ... time.sleep(.5)
+ ... c.write(s.upper())
+
+ >>> listener = zc.ngi.async.listener(None, echo_slowly)
+ >>> address = listener.address
+
+ >>> exec(src)
+ Traceback (most recent call last):
+ ...
+ Timeout
+
+ >>> zc.ngi.async.main.wait(1) # wait for the slow echo to finish
+
+ >>> listener.close()
+
+ Now, try a non-inline version.
+
+ >>> impl = zc.ngi.async.Implementation()
+ >>> src = src.replace("zc.ngi.async.main", "impl")
+
+ w/o timeout:
+
+ >>> listener = zc.ngi.async.listener(None, wc)
+ >>> address = listener.address
+ >>> exec(src)
+ ['2 1', '3 2']
+
+ >>> listener.close()
+
+ w timeout:
+
+ >>> listener = zc.ngi.async.listener(None, echo_slowly)
+ >>> address = listener.address
+
+ >>> exec(src)
+ Traceback (most recent call last):
+ ...
+ Timeout
+
+ >>> listener.close()
+
+ Cleanup:
+
+ >>> zc.ngi.async.wait(1)
+
+If the wait call times out, a ``zc.ngi.interfaces.Timeout`` exception
+will be raised.
+
+Most scripts will use an ``Inline`` implementation, like
+``zc.ngi.async.main`` because errors raised by handlers are propagated
+to the callers.
+
+A possible advantage of the non-inline implementations
+(``zc.ngi.async`` and instances of ``zc.ngi.async.Implementation``) is
+that, because the network requests are handled in a separate thread,
+an application can do other work while requests are being handled and
+before calling wait.
+
UDP
===
@@ -972,32 +1111,6 @@
>>> listener.close()
>>> zc.ngi.testing.udp(('', 8000), 'hello udp')
-Threading
-=========
-
-NGI tries to accommodate threaded applications without imposing
-thread-safety requirements.
-
-- Implementation (``IImplementation``) methods ``connect``, ``listener``,
- ``udp`` and ``udp_listener`` are thread safe. They may be called at
- any time by any thread.
-
-- Connection (``IConnection``) methods ``write``, ``writelines``, and
- ``close`` are thread safe. They may be called at
- any time by any thread.
-
- The connection setHandler method must only be called in a connect
- handler's ``connected`` method or a connection handler's
- ``handle_input`` method.
-
-- Listener (``IListener``) methods ``connections`` and ``close`` are
- thread safe. They may be called at
- any time by any thread.
-
-- Application handler methods need not be thread safe. NGI
- implementations will never call them from more than one thread at a
- time.
-
----------------------
.. [#twisted] The Twisted networking framework also provides this
@@ -1005,14 +1118,11 @@
testing environment as NGI does, although it's likely that it will
in the future.
+ A twisted implementation for NGI is planned.
+
.. [#writelines] The ``writelines`` method takes an iterable object.
.. [#twistedimplementations] A number of implementations based on
Twisted are planned, including a basic Twisted implementation and
an implementation using ``twisted.conch`` that will support
communication over ssh channels.
-
-.. cleanup
-
- >>> logging.getLogger('zc.ngi').removeHandler(loghandler)
- >>> logging.getLogger('zc.ngi').setLevel(logging.NOTSET)
Modified: zc.ngi/trunk/src/zc/ngi/generator.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/generator.py 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/src/zc/ngi/generator.py 2010-07-05 22:20:58 UTC (rev 114220)
@@ -11,14 +11,17 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
+import zc.ngi.interfaces
-
def handler(func=None, connection_adapter=None):
if func is None:
return lambda func: Handler(func, connection_adapter)
return Handler(func, connection_adapter)
class Handler(object):
+ zc.ngi.interfaces.implements(zc.ngi.interfaces.IServer,
+ zc.ngi.interfaces.IClientConnectHandler,
+ )
def __init__(self, func, connection_adapter):
self.func = func
@@ -44,7 +47,13 @@
ConnectionHandler(self.func(inst, connection), connection)
)
+ connected = __call__
+
+ def failed_connect(self, reason):
+ raise zc.ngi.interfaces.ConnectionFailed(reason)
+
class ConnectionHandler(object):
+ zc.ngi.interfaces.implements(zc.ngi.interfaces.IConnectionHandler)
def __init__(self, gen, connection):
try:
Modified: zc.ngi/trunk/src/zc/ngi/interfaces.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/interfaces.py 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/src/zc/ngi/interfaces.py 2010-07-05 22:20:58 UTC (rev 114220)
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2006 Zope Corporation and Contributors.
+# Copyright (c) 2006-2010 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -12,13 +12,19 @@
#
##############################################################################
+class Interface:
+ pass
+def Attribute(text):
+ return text
+def implements(*args):
+ pass
+moduleProvides = implements
+
try:
- from zope.interface import Interface, Attribute
+ raise ImportError
+ from zope.interface import Interface, Attribute, implements, moduleProvides
except ImportError:
- class Interface:
- pass
- def Attribute(text):
- return text
+ pass
class IImplementation(Interface):
"""Standard interface for ngi implementations
@@ -126,7 +132,7 @@
This is an implementation interface.
"""
- control = Attribute("An IServerControl")
+ control = Attribute("An IListener")
class IConnectionHandler(Interface):
"""Application objects that can handle connection input-data events
@@ -247,3 +253,11 @@
def close():
"""Close the listener
"""
+
+class ConnectionFailed(Exception):
+ """A Connection attempt failed
+ """
+
+class Timeout(Exception):
+ """Something took too long
+ """
Modified: zc.ngi/trunk/src/zc/ngi/testing.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/testing.py 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/src/zc/ngi/testing.py 2010-07-05 22:20:58 UTC (rev 114220)
@@ -19,7 +19,10 @@
import sys
import traceback
import zc.ngi
+import zc.ngi.interfaces
+zc.ngi.interfaces.moduleProvides(zc.ngi.interfaces.IImplementation)
+
class PrintingHandler:
def __init__(self, connection):
@@ -44,13 +47,14 @@
class Connection:
- control = None
+ zc.ngi.interfaces.implements(zc.ngi.interfaces.IConnection)
def __init__(self, peer=None, handler=PrintingHandler):
self.handler = None
self.closed = False
self.input = ''
self.exception = None
+ self.control = None
if peer is None:
peer = Connection(self)
handler(peer)
@@ -160,18 +164,17 @@
else:
self._callHandler('handle_exception', exception)
+class _ServerConnection(Connection):
+ zc.ngi.interfaces.implements(zc.ngi.interfaces.IServerConnection)
+
class TextPrintingHandler(PrintingHandler):
def handle_input(self, connection, data):
print data,
-class TextConnection(Connection):
+def TextConnection(peer=None, handler=TextPrintingHandler):
+ return Connection(peer, handler)
- control = None
-
- def __init__(self, peer=None, handler=TextPrintingHandler):
- Connection.__init__(self, peer, handler)
-
_connectable = {}
_recursing = object()
def connect(addr, handler):
@@ -202,6 +205,7 @@
_connectable.setdefault(addr, []).append(connection)
class listener:
+ zc.ngi.interfaces.implements(zc.ngi.interfaces.IListener)
def __init__(self, addr, handler=None):
if handler is None:
@@ -222,7 +226,7 @@
if self._handler is None:
raise TypeError("Listener closed")
if connection is None:
- connection = Connection()
+ connection = _ServerConnection()
peer = connection.peer
else:
peer = None
@@ -299,6 +303,7 @@
_udp_handlers = {}
class udp_listener:
+ zc.ngi.interfaces.implements(zc.ngi.interfaces.IUDPListener)
def __init__(self, address, handler=None, buffer_size=4096):
if handler is None:
Modified: zc.ngi/trunk/src/zc/ngi/tests.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/tests.py 2010-07-05 22:20:56 UTC (rev 114219)
+++ zc.ngi/trunk/src/zc/ngi/tests.py 2010-07-05 22:20:58 UTC (rev 114220)
@@ -24,8 +24,6 @@
import zc.ngi.testing
import zc.ngi.wordcount
-zc.ngi.async.start_thread() # Make sure the thread is already running
-
def test_async_cannot_connect():
"""Let's make sure that the connector handles connection failures correctly
@@ -56,10 +54,46 @@
"""
-def async_thread_has_name():
+def async_thread_management():
"""
+
+ There's no thread by default:
+
>>> len([t for t in threading.enumerate() if t.getName() == 'zc.ngi.async'])
+ 0
+
+ There's a default name:
+
+ >>> listener = zc.ngi.async.listener(None, lambda _: None)
+ >>> len([t for t in threading.enumerate() if t.getName() == 'zc.ngi.async'])
1
+ >>> listener.close()
+ >>> zc.ngi.async.wait(1)
+
+ When there's nothing to do, the thread goes away:
+
+ >>> len([t for t in threading.enumerate() if t.getName() == 'zc.ngi.async'])
+ 0
+
+ If we create out own implementation, we can give it a name:
+
+ >>> impl = zc.ngi.async.Implementation(name='bob')
+ >>> listener = impl.listener(None, lambda _: None)
+ >>> len([t for t in threading.enumerate() if t.getName() == 'bob'])
+ 1
+ >>> listener.close()
+ >>> impl.wait(1)
+
+ Otherwise, it gets a slightly more descriptive name:
+
+ >>> impl = zc.ngi.async.Implementation('')
+ >>> listener = impl.listener(None, lambda _: None)
+ >>> len([t for t in threading.enumerate()
+ ... if t.getName() == 'zc.ngi.async application created'])
+ 1
+ >>> listener.close()
+ >>> impl.wait(1)
+
"""
def blocking_connector_handles_failed_connect():
@@ -114,6 +148,7 @@
Get size of socket map:
+ >>> zc.ngi.async.wait(1)
>>> size = len(zc.ngi.async._map)
Now, trying to create a listener on the port should fail, and the
@@ -182,6 +217,9 @@
>>> logger.removeHandler(log_handler)
>>> logger.setLevel(logging.NOTSET)
+ >>> listener.close()
+ >>> zc.ngi.async.wait(1)
+
"""
def when_a_server_closes_a_connection_blocking_request_returns_reason():
@@ -363,6 +401,9 @@
zc.ngi.async.listener(addr, BrokenAfterConnect())
zc.ngi.async.connect(addr, BrokenAfterConnect())
+def cleanup_async(test):
+ zc.ngi.async.cleanup_map()
+ zc.ngi.async.wait(1)
def test_suite():
return unittest.TestSuite([
@@ -380,9 +421,9 @@
),
doctest.DocFileSuite(
'async.txt',
- setUp=async_evil_setup,
+ setUp=async_evil_setup, tearDown=cleanup_async,
),
- doctest.DocTestSuite(),
+ doctest.DocTestSuite(setUp=cleanup_async, tearDown=cleanup_async),
])
if __name__ == '__main__':
More information about the checkins
mailing list