[Checkins] SVN: zc.ngi/trunk/ New Features:
Jim Fulton
jim at zope.com
Sun Jun 20 11:53:59 EDT 2010
Log message for revision 113716:
New Features:
- New improved documentation
- Support for writing request handlers in an imperative style using
generators.
- Cleaner testing interfaces
- Added a new blocking client request interface,
``zc.ngi.blocking.request``. Other older blocking APIs are
deprecated.
- Dropped support for Python 2.4.
Bugs Fixed:
- The ``Sized`` request adapter's ``writelines`` method was broken.
- There we a number of problems with error handling in the ``async``
implementation.
Changed:
U zc.ngi/trunk/README.txt
U zc.ngi/trunk/buildout.cfg
U zc.ngi/trunk/setup.py
U zc.ngi/trunk/src/zc/ngi/adapters.py
U zc.ngi/trunk/src/zc/ngi/adapters.txt
U zc.ngi/trunk/src/zc/ngi/async.py
U zc.ngi/trunk/src/zc/ngi/async.txt
U zc.ngi/trunk/src/zc/ngi/blocking.py
U zc.ngi/trunk/src/zc/ngi/blocking.txt
A zc.ngi/trunk/src/zc/ngi/doc/
U zc.ngi/trunk/src/zc/ngi/doc/index.txt
A zc.ngi/trunk/src/zc/ngi/generator.py
U zc.ngi/trunk/src/zc/ngi/interfaces.py
U zc.ngi/trunk/src/zc/ngi/testing.py
U zc.ngi/trunk/src/zc/ngi/tests.py
-=-
Modified: zc.ngi/trunk/README.txt
===================================================================
--- zc.ngi/trunk/README.txt 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/README.txt 2010-06-20 15:53:58 UTC (rev 113716)
@@ -1,18 +1,21 @@
-*************************
Network Gateway Interface
*************************
-Network programs are typically difficult to test because they require
-setting up network connections, clients, and servers. In addition,
-application code gets mixed up with networking code.
+The Network Gateway Interface provides:
-The Network Gateway Interface (NGI) seeks to improve this situation by
-separating application code from network code. This allows
-application and network code to be tested independently and provides
-greater separation of concerns.
+- the ability to test application networking code without use of
+ sockets, threads or subprocesses
-.. contents::
+- clean separation of application code and low-level networking code
+- a fairly simple inheritence free set of networking APIs
+
+- an event-based framework that makes it easy to handle many
+ simultaneous connections while still supporting an imperative
+ programming style.
+
+To learn more, see http://packages.python.org/zc.ngi/
+
Changes
*******
Modified: zc.ngi/trunk/buildout.cfg
===================================================================
--- zc.ngi/trunk/buildout.cfg 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/buildout.cfg 2010-06-20 15:53:58 UTC (rev 113716)
@@ -1,24 +1,17 @@
[buildout]
develop = .
-parts = test py
+parts = test py sphinx
[test]
recipe = zc.recipe.testrunner
eggs = zc.ngi [test]
-[test2.4]
-recipe = zc.recipe.testrunner
-eggs = zc.ngi [test]
-python = python2.4
-
[test2.5]
-recipe = zc.recipe.testrunner
-eggs = zc.ngi [test]
+<= test
python = python2.5
[test2.6]
-recipe = zc.recipe.testrunner
-eggs = zc.ngi [test]
+<= test
python = python2.6
[py]
@@ -26,3 +19,8 @@
eggs = zc.ngi
interpreter = py
+[sphinx]
+recipe = zc.recipe.egg
+eggs = sphinx
+ Pygments
+ zc.ngi
Modified: zc.ngi/trunk/setup.py
===================================================================
--- zc.ngi/trunk/setup.py 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/setup.py 2010-06-20 15:53:58 UTC (rev 113716)
@@ -14,49 +14,27 @@
name, version = 'zc.ngi', '0'
-import os
from setuptools import setup, find_packages
-def read(*rnames):
- return open(os.path.join(os.path.dirname(__file__), *rnames)).read()
+readme = open('README.txt').read()
-long_description=(
- read('README.txt')
- + '\n' +
- 'Detailed Documentation\n'
- '**********************\n'
- + '\n' +
- read('src', 'zc', 'ngi', 'README.txt')
- + '\n' +
- read('src', 'zc', 'ngi', 'blocking.txt')
- + '\n' +
- read('src', 'zc', 'ngi', 'adapters.txt')
- + '\n' +
- read('src', 'zc', 'ngi', 'async.txt')
- + '\n' +
- 'Download\n'
- '**********************\n'
- )
+tests_require = ['zope.testing', 'manuel']
-open('documentation.txt', 'w').write(long_description)
-
setup(
name = name, version=version,
author = "Jim Fulton",
author_email = "jim at zope.com",
- description = "Network Gateway Interface",
+ description = readme.split('\n', 1)[0],
license = "ZPL 2.1",
- keywords = "network",
- url='http://www.python.org/pypi/'+name,
- long_description=long_description,
+ keywords = ["networking", "testing"],
+ url='http://packages.python.org/'+name,
+ long_description=readme,
packages = find_packages('src'),
include_package_data = True,
package_dir = {'':'src'},
namespace_packages = ['zc'],
install_requires = ['setuptools'],
- extras_require = dict(
- test = ['zope.testing'],
- ),
+ extras_require = dict(test=tests_require),
zip_safe = False,
)
Modified: zc.ngi/trunk/src/zc/ngi/adapters.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/adapters.py 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/src/zc/ngi/adapters.py 2010-06-20 15:53:58 UTC (rev 113716)
@@ -12,48 +12,64 @@
#
##############################################################################
"""NGI connection adapters
-
-$Id$
"""
import struct
+import zc.ngi.generator
-class Lines:
+class Base(object):
def __init__(self, connection):
self.connection = connection
- self.close = connection.close
- self.write = connection.write
- self.writelines = connection.writelines
+ def close(self):
+ self.connection.close()
+
+ def write(self, data):
+ self.write = self.connection.write
+ self.write(data)
+
+ def writelines(self, data):
+ self.writelines = self.connection.writelines
+ self.writelines(data)
+
def setHandler(self, handler):
self.handler = handler
- self.input = ''
self.connection.setHandler(self)
def handle_input(self, connection, data):
+ handle_input = self.handler.handle_input
+ self.handle_input(connection, data)
+
+ def handle_close(self, connection, reason):
+ self.handler.handle_close(connection, reason)
+
+ def handle_exception(self, connection, reason):
+ self.handler.handle_exception(connection, reason)
+
+ @classmethod
+ def handler(class_, func):
+ return zc.ngi.generator.handler(func, class_)
+
+class Lines(Base):
+
+ input = ''
+
+ def handle_input(self, connection, data):
self.input += data
data = self.input.split('\n')
self.input = data.pop()
for line in data:
self.handler.handle_input(self, line)
- def handle_close(self, connection, reason):
- self.handler.handle_close(self, reason)
+class Sized(Base):
-class Sized:
-
- def __init__(self, connection):
- self.connection = connection
- self.close = connection.close
-
+ want = 4
+ got = 0
+ getting_size = True
def setHandler(self, handler):
- self.handler = handler
self.input = []
- self.want = 4
- self.got = 0
- self.getting_size = True
- self.connection.setHandler(self)
+ Base.setHandler(self, handler)
def handle_input(self, connection, data):
self.got += len(data)
@@ -84,11 +100,20 @@
self.getting_size = True
self.handler.handle_input(self, collected)
- def handle_close(self, connection, reason):
- self.handler.handle_close(self, reason)
+ def writelines(self, data):
+ self.connection.writelines(sized_iter(data))
def write(self, message):
if message is None:
self.connection.write('\xff\xff\xff\xff')
else:
- self.connection.write(struct.pack(">I", len(message))+message)
+ self.connection.write(struct.pack(">I", len(message)))
+ self.connection.write(message)
+
+def sized_iter(data):
+ for message in data:
+ if message is None:
+ yield '\xff\xff\xff\xff'
+ else:
+ yield struct.pack(">I", len(message))
+ yield message
Modified: zc.ngi/trunk/src/zc/ngi/adapters.txt
===================================================================
--- zc.ngi/trunk/src/zc/ngi/adapters.txt 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/src/zc/ngi/adapters.txt 2010-06-20 15:53:58 UTC (rev 113716)
@@ -53,10 +53,19 @@
>>> adapter.write('foo')
-> 'foo'
- >>> adapter.writelines(['foo', 'bar'])
- -> 'foo'
- -> 'bar'
+ >>> adapter.writelines("%s\n" % foo for foo in range(3))
+ -> '0\n'
+ -> '1\n'
+ -> '2\n'
+.. again with feeling
+
+ >>> adapter.writelines("%s\n" % foo for foo in range(3))
+ -> '0\n'
+ -> '1\n'
+ -> '2\n'
+
+::
>>> connection.test_close('test')
-> CLOSE test
@@ -98,8 +107,20 @@
message size:
>>> adapter.write(message1)
- -> '\x00\x00\x00\x19Hello\nWorld!\nHow are you?'
+ -> '\x00\x00\x00\x19'
+ -> 'Hello\nWorld!\nHow are you?'
+We can give multiple messages using writelines:
+
+ >>> adapter.writelines("%s\n" % foo for foo in range(3))
+ -> '\x00\x00\x00\x02'
+ -> '0\n'
+ -> '\x00\x00\x00\x02'
+ -> '1\n'
+ -> '\x00\x00\x00\x02'
+ -> '2\n'
+
+
Null messages
-------------
Modified: zc.ngi/trunk/src/zc/ngi/async.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/async.py 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/src/zc/ngi/async.py 2010-06-20 15:53:58 UTC (rev 113716)
@@ -20,11 +20,9 @@
import errno
import logging
import os
-import select
import socket
import sys
import threading
-import time
import zc.ngi
@@ -81,7 +79,7 @@
self.__connected = True
self.__closed = None
self.__handler = None
- self.__exception = None
+ self.__iterator_exception = None
self.__output = []
dispatcher.__init__(self, sock, addr)
self.logger = logger
@@ -94,14 +92,14 @@
raise TypeError("Handler already set")
self.__handler = handler
- if self.__exception:
- exception = self.__exception
- self.__exception = None
+ if self.__iterator_exception:
+ v = self.__iterator_exception
+ self.__iterator_exception = None
try:
- handler.handle_exception(self, exception)
+ handler.handle_exception(self, v)
except:
self.logger.exception("handle_exception failed")
- return self.handle_close("handle_exception failed")
+ raise
if self.__closed:
try:
@@ -159,7 +157,7 @@
self.__handler.handle_input(self, d)
except:
self.logger.exception("handle_input failed")
- self.handle_close("handle_input failed")
+ raise
if len(d) < BUFFER_SIZE:
break
@@ -179,16 +177,22 @@
# Must be an iterator
try:
v = v.next()
+ if not isinstance(v, str):
+ raise TypeError(
+ "writelines iterator must return strings",
+ v)
except StopIteration:
# all done
output.pop(0)
continue
+ except Exception, v:
+ self.logger.exception("writelines iterator failed")
+ if self.__handler is None:
+ self.__iterator_exception = v
+ raise
+ else:
+ self.__handler.handle_exception(self, v)
- if __debug__ and not isinstance(v, str):
- exc = TypeError("iterable output returned a non-string", v)
- self.__report_exception(exc)
- raise exc
-
output.insert(0, v)
if not v:
@@ -202,7 +206,7 @@
return # we couldn't write anything
raise
except Exception, v:
- self.__report_exception(v)
+ self.logger.exception("send failed")
raise
if n == len(v):
@@ -211,16 +215,6 @@
output[0] = v[n:]
return # can't send any more
- def __report_exception(self, exception):
- if self.__handler is not None:
- try:
- self.__handler.handle_exception(self, exception)
- except:
- self.logger.exception("handle_exception failed")
- self.handle_close("handle_exception failed")
- else:
- self.__exception = exception
-
def handle_close(self, reason='end of input'):
if __debug__:
self.logger.debug('close %r', reason)
@@ -259,6 +253,8 @@
_CONNECT_OK = (0, errno.EISCONN)
def __init__(self, addr, handler):
+ if not _thread:
+ start_thread()
self.__handler = handler
if isinstance(addr, str):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -356,6 +352,8 @@
logger = logging.getLogger('zc.ngi.async.server')
def __init__(self, addr, handler):
+ if not _thread:
+ start_thread()
self.__handler = handler
self.__close_handler = None
self.__connections = {}
@@ -368,7 +366,25 @@
try:
if not is_win32:
self.set_reuse_addr()
- self.bind(addr)
+ if addr is None:
+ # Try to pick one, primarily for testing
+ import random
+ n = 0
+ while 1:
+ port = random.randint(10000, 30000)
+ addr = 'localhost', port
+ try:
+ self.bind(addr)
+ except socket.error:
+ n += 1
+ if n > 100:
+ raise
+ else:
+ continue
+ break
+ else:
+ self.bind(addr)
+
self.logger.info("listening on %r", addr)
self.listen(255)
except socket.error:
@@ -376,6 +392,7 @@
self.logger.warn("unable to listen on %r", addr)
raise
self.add_channel(_map)
+ self.address = addr
notify_select()
def handle_accept(self):
@@ -416,7 +433,7 @@
def close(self, handler=None):
self.accepting = False
self.del_channel(_map)
- self.socket.close()
+ call_from_thread(self.socket.close)
if handler is None:
for c in list(self.__connections):
c.handle_close("stopped")
@@ -425,12 +442,18 @@
else:
self.__close_handler = handler
+ # convenience method made possible by storaing out address:
+ def connect(self, handler):
+ connect(self.address, handler)
+
class udp_listener(BaseListener):
logger = logging.getLogger('zc.ngi.async.udpserver')
connected = True
def __init__(self, addr, handler, buffer_size=4096):
+ if not _thread:
+ start_thread()
self.__handler = handler
self.__buffer_size = buffer_size
asyncore.dispatcher.__init__(self)
@@ -457,7 +480,7 @@
def close(self):
self.del_channel(_map)
- self.socket.close()
+ call_from_thread(self.socket.close)
# udp uses GIL to get thread-safe socket management
if is_win32:
@@ -484,6 +507,9 @@
logger = logging.getLogger('zc.ngi.async.trigger')
+ def __init__(self):
+ self.callbacks = []
+
def writable(self):
return 0
@@ -495,15 +521,23 @@
self.close()
def handle_read(self):
+ while self.callbacks:
+ callback = self.callbacks.pop(0)
+ try:
+ callback()
+ except:
+ self.logger.exception('Calling callback')
+
try:
self.recv(BUFFER_SIZE)
except socket.error:
- return
+ pass
if os.name == 'posix':
class _Trigger(_Triggerbase, asyncore.file_dispatcher):
def __init__(self):
+ _Triggerbase.__init__(self)
self.__readfd, self.__writefd = os.pipe()
asyncore.file_dispatcher.__init__(self, self.__readfd)
@@ -528,6 +562,7 @@
class _Trigger(_Triggerbase, asyncore.dispatcher):
def __init__(self):
+ _Triggerbase.__init__(self)
# Get a pair of connected sockets. The trigger is the 'w'
# end of the pair, which is connected to 'r'. 'r' is put
@@ -596,6 +631,10 @@
notify_select = _trigger.pull_trigger
+def call_from_thread(func):
+ _trigger.callbacks.append(func)
+ notify_select()
+
def loop():
timeout = 30.0
map = _map
@@ -611,9 +650,20 @@
try:
asyncore.poll(timeout, map)
except:
+ print sys.exc_info()[0]
logger.exception('loop error')
raise
-_thread = threading.Thread(target=loop, name=__name__)
-_thread.setDaemon(True)
-_thread.start()
+_thread = None
+_start_lock = threading.Lock()
+def start_thread(daemon=True):
+ global _thread
+ _start_lock.acquire()
+ try:
+ if _thread is not None:
+ return
+ _thread = threading.Thread(target=loop, name=__name__)
+ _thread.setDaemon(daemon)
+ _thread.start()
+ finally:
+ _start_lock.release()
Modified: zc.ngi/trunk/src/zc/ngi/async.txt
===================================================================
--- zc.ngi/trunk/src/zc/ngi/async.txt 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/src/zc/ngi/async.txt 2010-06-20 15:53:58 UTC (rev 113716)
@@ -94,7 +94,7 @@
>>> input.readline()
Traceback (most recent call last):
...
- TypeError: ('iterable output returned a non-string', 2)
+ TypeError: ('writelines iterator must return strings', 2)
If there is an error, then the connection is closed:
@@ -164,9 +164,11 @@
>>> print loghandler
zc.ngi.async.client ERROR
handle_input failed
+ zc.ngi.async.client ERROR
+ handle_error
>>> handler.closed
- 'handle_input failed'
+ TypeError('handle_input() takes exactly 2 arguments (3 given)',)
>>> loghandler.uninstall()
@@ -225,9 +227,11 @@
>>> time.sleep(0.1)
>>> listener2 = zc.ngi.async.listener(('127.0.0.1', 9645), handler)
+ ... # doctest: +ELLIPSIS
Traceback (most recent call last):
...
- error: (98, 'Address already in use')
+ error:...Address already in use...
+
>>> time.sleep(0.1)
>>> logcontent = str(loghandler)
Modified: zc.ngi/trunk/src/zc/ngi/blocking.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/blocking.py 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/src/zc/ngi/blocking.py 2010-06-20 15:53:58 UTC (rev 113716)
@@ -11,13 +11,9 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""File-like network interface
-
-$Id$
-"""
-
-import threading, time
-
+import sys
+import threading
+import time
import zc.ngi
class ConnectionFailed(Exception):
@@ -32,7 +28,119 @@
"""An attempt to connect timed out.
"""
-def connect(address, connect, timeout=None):
+class RequestConnection:
+
+ def __init__(self, connection, connector):
+ self.connection = connection
+ self.connector = connector
+
+ def write(self, data):
+ self.write = self.connection.write
+ self.write(data)
+
+ def writelines(self, data):
+ self.writelines = self.connection.writelines
+ self.writelines(data)
+
+ def close(self):
+ self.connector.closed = True
+ self.connection.close()
+ self.connector.event.set()
+
+ def setHandler(self, handler):
+ self.handler = handler
+ self.connection.setHandler(self)
+
+ def handle_input(self, connection, data):
+ try:
+ self.handler.handle_input(self, data)
+ except:
+ self.connector.exception = sys.exc_info()
+ self.connector.event.set()
+ raise
+
+ def handle_close(self, connection, reason):
+ handle_close = getattr(self.handler, 'handle_close', None)
+ if handle_close is not None:
+ try:
+ handle_close(self, reason)
+ except:
+ self.connector.exception = sys.exc_info()
+ self.connector.event.set()
+ raise
+
+ self.connector.closed = True
+ self.connector.result = reason
+ self.connector.event.set()
+
+ @property
+ def handle_exception(self):
+ handle = self.handler.handle_exception
+ def handle_exception(connection, exception):
+ try:
+ handle(self, exception)
+ except:
+ self.connector.exception = sys.exc_info()
+ self.connector.event.set()
+ raise
+ return handle_exception
+
+
+class RequestConnector:
+
+ exception = closed = connection = result = None
+
+ def __init__(self, handler, event):
+ try:
+ connected = handler.connected
+ except AttributeError:
+ if callable(handler):
+ connected = handler
+ elif getattr(handler, 'handle_input', None) is None:
+ raise
+ else:
+ connected = lambda connection: connection.setHandler(handler)
+
+ self._connected = connected
+ self.event = event
+
+ def connected(self, connection):
+ self.connection = connection
+ try:
+ self._connected(RequestConnection(connection, self))
+ except:
+ self.exception = sys.exc_info()
+ self.event.set()
+ raise
+
+ def failed_connect(self, reason):
+ self.exception = ConnectionFailed(reason)
+ self.event.set()
+
+def request(connect, address, connection_handler, timeout=None):
+ event = threading.Event()
+ connector = RequestConnector(connection_handler, event)
+ connect(address, connector)
+ event.wait(timeout)
+
+ if connector.exception:
+ exception = connector.exception
+ del connector.exception
+ if isinstance(exception, tuple):
+ raise exception[0], exception[1], exception[2]
+ else:
+ raise exception
+
+ if connector.closed:
+ return connector.result
+
+ if connector.connection is None:
+ raise ConnectionTimeout
+ raise Timeout
+
+def connect(address, connect=None, timeout=None):
+ if connect is None:
+ connect = zc.ngi.implementation.connect
return _connector().connect(address, connect, timeout)
class _connector:
@@ -58,7 +166,8 @@
self.event.set()
def open(connection_or_address, connector=None, timeout=None):
- if connector is None:
+ if connector is None and hasattr(connection_or_address, 'setHandler'):
+ # connection_or_address is a connection
connection = connection_or_address
else:
connection = connect(connection_or_address, connector, timeout)
Modified: zc.ngi/trunk/src/zc/ngi/blocking.txt
===================================================================
--- zc.ngi/trunk/src/zc/ngi/blocking.txt 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/src/zc/ngi/blocking.txt 2010-06-20 15:53:58 UTC (rev 113716)
@@ -3,7 +3,7 @@
=======================
The NGI normally uses an event-based networking model in which
-application code reactes to incoming data. That model works well for
+application code reacts to incoming data. That model works well for
some applications, especially server applications, but can be a bit of
a bother for simpler applications, especially client applications.
Property changes on: zc.ngi/trunk/src/zc/ngi/doc
___________________________________________________________________
Added: svn:ignore
+ _build
Modified: zc.ngi/trunk/src/zc/ngi/doc/index.txt
===================================================================
--- zc.ngi/branches/jim-dev/src/zc/ngi/doc/index.txt 2010-06-20 12:24:02 UTC (rev 113714)
+++ zc.ngi/trunk/src/zc/ngi/doc/index.txt 2010-06-20 15:53:58 UTC (rev 113716)
@@ -203,12 +203,7 @@
.. -> src
- >>> import sys
- >>> if sys.version_info >= (2, 5):
- ... exec(src)
- ... else:
- ... def wc(connection):
- ... connection.setHandler(WC())
+ >>> exec(src)
The generator takes a connection object and gets data via ``yield``
statements. The yield statements can raise exceptions. In
@@ -552,10 +547,7 @@
.. -> src
- >>> if sys.version_info >= (2, 5):
- ... exec(src)
- ... else:
- ... WCClientG = WCClient
+ >>> exec(src)
>>> wcc = WCClientG('first one\nsecond one')
>>> connection = zc.ngi.testing.Connection()
@@ -908,14 +900,13 @@
.. -> src
- >>> if sys.version_info >= (2, 5):
- ... exec(src)
- ... listener = zc.ngi.testing.listener(wcadapted)
- ... connection = listener.connect()
- ... connection.write('15')
- ... connection.write('\nhello out\nthere')
- ... listener.close()
+ >>> exec(src)
+ >>> listener = zc.ngi.testing.listener(wcadapted)
+ >>> connection = listener.connect()
+ >>> connection.write('15')
+ >>> connection.write('\nhello out\nthere')
-> '2 3\n'
+ >>> listener.close()
By separating the low-level protocol handling from the application
logic, we can reuse the low-level protocol in other applications, and
@@ -948,14 +939,11 @@
.. -> src
- >>> if sys.version_info >= (2, 5):
- ... exec(src)
- ... connection = zc.ngi.testing.Connection()
- ... handler = example(connection)
- ... connection.peer.write('Hi')
- ... print 'nothing yet :)'
- ... connection.peer.write(' world!\n')
- nothing yet :)
+ >>> exec(src)
+ >>> connection = zc.ngi.testing.Connection()
+ >>> handler = example(connection)
+ >>> connection.peer.write('Hi')
+ >>> connection.peer.write(' world!\n')
Hi world!
-> CLOSE
Copied: zc.ngi/trunk/src/zc/ngi/generator.py (from rev 113714, zc.ngi/branches/jim-dev/src/zc/ngi/generator.py)
===================================================================
--- zc.ngi/trunk/src/zc/ngi/generator.py (rev 0)
+++ zc.ngi/trunk/src/zc/ngi/generator.py 2010-06-20 15:53:58 UTC (rev 113716)
@@ -0,0 +1,71 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+
+def handler(func=None, connection_adapter=None):
+ if func is None:
+ return lambda func: Handler(func, connection_adapter)
+ return Handler(func, connection_adapter)
+
+class Handler(object):
+
+ def __init__(self, func, connection_adapter):
+ self.func = func
+ self.connection_adapter = connection_adapter
+
+ def __call__(self, *args):
+ if self.connection_adapter is not None:
+ args = args[:-1]+(self.connection_adapter(args[-1]), )
+ return ConnectionHandler(self.func(*args), args[-1])
+
+ def __get__(self, inst, class_):
+ if inst is None:
+ return self
+
+ if self.connection_adapter is not None:
+ def connected(connection):
+ connection = self.connection_adapter(connection)
+ return ConnectionHandler(self.func(inst, connection),
+ connection)
+ return connected
+
+ return (lambda connection:
+ ConnectionHandler(self.func(inst, connection), connection)
+ )
+
+class ConnectionHandler(object):
+
+ def __init__(self, gen, connection):
+ try:
+ gen.next()
+ except StopIteration:
+ return
+
+ self.gen = gen
+ connection.setHandler(self)
+
+ def handle_input(self, connection, data):
+ try:
+ self.gen.send(data)
+ except StopIteration:
+ connection.close()
+
+ def handle_close(self, connection, reason):
+ try:
+ self.gen.throw(GeneratorExit, GeneratorExit(reason))
+ except (GeneratorExit, StopIteration):
+ pass
+
+ def handle_exception(self, connection, exception):
+ self.gen.throw(exception.__class__, exception)
Modified: zc.ngi/trunk/src/zc/ngi/interfaces.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/interfaces.py 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/src/zc/ngi/interfaces.py 2010-06-20 15:53:58 UTC (rev 113716)
@@ -11,53 +11,15 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""Network Gateway Interface (NGI)
-The interfaces are split between "implementation" and "application"
-interfaces. An implementation of the NGI provides IImplementation,
-IConnection, IServerConnection, IServerControl, and IUDPServerControl.
-An TCP application provides IConnectionHandler and one or both of
-IClientConnectHandler and IServer. A UDP server application might
-provide IUDPHandler.
+try:
+ from zope.interface import Interface, Attribute
+except ImportError:
+ class Interface:
+ pass
+ def Attribute(text):
+ return text
-The NGI is an event-based framework in the sense that applications
-register handlers that respond to input events. There are 4 kinds of
-handlers:
-
-- Input handlers receive network input and notification of connection
- closes and exceptions,
-
-- Client-connect handlers respond to outbound connection events, and
-
-- Servers respond to incoming connection events.
-
-- UDP handlers respond to incoming UDP messages.
-
-The interfaces are designed to allow single-threaded applications:
-
-- An implementation of the interfaces is not allowed to make multiple
- simultaneous calls to the same application handler. (Note that this
- requirement does not extend across multiple implementations.
- Theoretically, different implementations could call handlers at the
- same time.)
-
- Note that when an application calls setHandler on a connection, the
- connection handler may have it's methods called immediately with
- pending input or notifications.
-
-- All handler calls that are associated with a connection include the
- connection as a parameter, This allows a single handler object to
- respond to events from multiple connections.
-
-Applications may be multi-threaded. This means that implementations
-must be thread safe. This means that, unless otherwise stated, calls
-into the implementation could be made at any time.
-
-$Id$
-"""
-
-from zope.interface import Interface, Attribute
-
class IImplementation(Interface):
"""Standard interface for ngi implementations
"""
@@ -69,6 +31,9 @@
connected method will be called with an IConnection object
if and when the connection succeeds or failed_connect method
will be called if the connection fails.
+
+ This method os thread safe. It may be called by any thread at
+ any time.
"""
def listener(address, handler):
@@ -77,18 +42,27 @@
When a connection is received, call the handler.
An IListener object is returned.
+
+ This method os thread safe. It may be called by any thread at
+ any time.
"""
def udp(address, message):
"""Send a UDP message
+
+ This method is thread safe. It may be called by any thread at
+ any time.
"""
- def udp_listen(address, handler, buffer_size=4096):
+ def udp_listener(address, handler, buffer_size=4096):
"""Listen for incoming UDP messages
When a message is received, call the handler with the message.
An IUDPListener object is returned.
+
+ This method os thread safe. It may be called by any thread at
+ any time.
"""
class IConnection(Interface):
@@ -124,6 +98,9 @@
"""Output a string to the connection.
The write call is non-blocking.
+
+ This method os thread safe. It may be called by any thread at
+ any time.
"""
def writelines(data):
@@ -131,10 +108,16 @@
The writelines call is non-blocking. Note, that the data may
not have been consumed when the method returns.
+
+ This method os thread safe. It may be called by any thread at
+ any time.
"""
def close():
"""Close the connection
+
+ This method os thread safe. It may be called by any thread at
+ any time.
"""
class IServerConnection(IConnection):
@@ -161,11 +144,10 @@
The data is an 8-bit string.
- Note that there are no promises about blocking. The data
- isn't necessarily record oriented. For example, data could,
- in theory be passed one character at a time. It is up to
- applications to organize data into records, if desired.
-
+ Note that there are no promises about data organization. The
+ data isn't necessarily record oriented. For example, data
+ could, in theory be passed one character at a time. It is up
+ to applications to organize data into records, if desired.
"""
def handle_close(connection, reason):
@@ -237,6 +219,8 @@
This is an implementation interface.
"""
+ address = Attribute("The address the listener is listening on.")
+
def connections():
"""return an iterable of the current connections
"""
@@ -263,143 +247,3 @@
def close():
"""Close the listener
"""
-
-class IBlocking(Interface):
- """Top-level blocking interface provided by the blocking module
- """
-
- def connect(address, connect, timeout=None):
- """Connect to the given address using the given connect callable
-
- A timout value may be given as a floating point number of
- seconds.
-
- If connection suceeds, an IConnection is returned, otherwise
- an exception is raised.
- """
-
- def open(connection_or_address, connect=None, timeout=None):
- """Get output and input files for a connection or address
-
- The first argument is either a connection or an address.
- If (and only if) it is an address, then a connect callable must be
- provided as the second argument and a connection is gotten by
- calling the connect function with the given address,
- connect callable, and timeout.
-
- A pair of file-like objects is returned. The first is an
- output file-like object, an IBlockingOutput, for sending
- output to the connection. The second file-like object is an
- input file-like object, an IBlockingInput, for reading data
- from the connection.
- """
-
-class IBlockingPositionable(Interface):
- """File-like objects with file positions.
-
- To mimic file objects, working seek and tell methods are provided
- that report and manipulate pseudo file positions. The file
- position starts at zero and is advanced by reading or writing
- data. It can be adjusted (pointlessly) by the seek method.
- """
-
- def tell():
- """Return the current file position.
- """
-
- def seek(offset, whence=0):
- """Reset the file position
-
- If whence is 0, then the file position is set to the offset.
-
- If whence is 1, the position is increased by the offset.
-
- If whence is 2, the position is decreased by the offset.
-
- An exception is raised if the position is set to a negative
- value.
- """
-
- def close():
- """Close the connection.
- """
-
-class IBlockingOutput(IBlockingPositionable):
- """A file-like object for sending output to a connection.
- """
-
- def flush():
- """Do nothing.
- """
-
- def write(data):
- """Write a string to the connection.
-
- The function will return immediately. The data may be queued.
- """
-
- def writelines(iterable, timeout=0, nonblocking=False):
- """Write an iterable of strings to the connection.
-
- By default, the call will block until the data from the
- iterable has been consumed. If a true value is passed to the
- non-blocking keyword argument, then the function will return
- immediately. The iterable will be consumed at some later time.
-
- In (the default) blocking mode, a timeout may be provided to
- limit the time that the call will block. If the timeout
- expires, a zc.ngi.blocking.Timeout excation will be raised.
- """
-
-class IBlockingInput(IBlockingPositionable):
- """A file-like object for reading input from a connection.
- """
-
- def read(size=None, timeout=None):
- """Read data
-
- If a size is specified, then that many characters are read,
- blocking of necessary. If no size is specified (or if size is
- None), then all remaining input data are read.
-
- A timeout may be specified as a floating point number of
- seconds to wait. A zc.ngi.blocking.Timeout exception will be
- raised if the data cannot be read in the number of seconds given.
- """
-
- def readline(size=None, timeout=None):
- """Read a line of data
-
- If a size is specified, then the lesser of that many
- characters or a single line of data are read, blocking of
- necessary. If no size is specified (or if size is None), then
- a single line are read.
-
- A timeout may be specified as a floating point number of
- seconds to wait. A zc.ngi.blocking.Timeout exception will be
- raised if the data cannot be read in the number of seconds given.
- """
-
- def readlines(sizehint=None, timeout=None):
- """Read multiple lines of data
-
- If a sizehint is specified, then one or more lines of data are
- returned whose total length is less than or equal to the size
- hint, blocking if necessary. If no sizehint is specified (or
- if sizehint is None), then the remainder of input, split into
- lines, is returned.
-
- A timeout may be specified as a floating point number of
- seconds to wait. A zc.ngi.blocking.Timeout exception will be
- raised if the data cannot be read in the number of seconds given.
- """
-
- def __iter__():
- """Return the input object
- """
-
- def next():
- """Return a line of input
-
- Raises StopIteration if there is no more input.
- """
Modified: zc.ngi/trunk/src/zc/ngi/testing.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/testing.py 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/src/zc/ngi/testing.py 2010-06-20 15:53:58 UTC (rev 113716)
@@ -67,8 +67,25 @@
method, args = self.queue.pop(0)
if self.closed and method != 'handle_close':
break
+
try:
- getattr(self.handler, method)(self, *args)
+ try:
+ handler = getattr(self.handler, method)
+ except AttributeError:
+ if method == 'handle_close':
+ return # Optional method
+ elif method == 'handle_exception':
+ # Unhandled exception
+ self.close()
+ handler = getattr(self.handler, 'handle_close',
+ None)
+ if handler is None:
+ return
+ args = self, 'unhandled exception'
+ else:
+ raise
+
+ handler(self, *args)
except:
print "Error test connection calling connection handler:"
traceback.print_exc(file=sys.stdout)
@@ -156,14 +173,29 @@
Connection.__init__(self, peer, handler)
_connectable = {}
-
+_recursing = object()
def connect(addr, handler):
connections = _connectable.get(addr)
- if connections:
- handler.connected(connections.pop(0))
- else:
- handler.failed_connect('no such server')
+ if isinstance(connections, list):
+ if connections:
+ return handler.connected(connections.pop(0))
+ elif isinstance(connections, listener):
+ return handler.connected(connections.connect())
+ elif connections is _recursing:
+ print (
+ "For address, %r, a connect handler called connect from a\n"
+ "failed_connect call."
+ % (addr, ))
+ del _connectable[addr]
+ return
+ _connectable[addr] = _recursing
+ handler.failed_connect('no such server')
+ try:
+ del _connectable[addr]
+ except KeyError:
+ pass
+
connector = connect
def connectable(addr, connection):
@@ -171,21 +203,33 @@
class listener:
- def __init__(self, handler):
+ def __init__(self, addr, handler=None):
+ if handler is None:
+ handler = addr
+ addr = None
+ else:
+ _connectable[addr] = self
+ self.address = addr
self._handler = handler
self._close_handler = None
self._connections = []
- def connect(self, connection, handler=None):
+ def connect(self, connection=None, handler=None):
if handler is not None:
# connection is addr in this case and is ignored
handler.connected(Connection(None, self._handler))
return
if self._handler is None:
raise TypeError("Listener closed")
+ if connection is None:
+ connection = Connection()
+ peer = connection.peer
+ else:
+ peer = None
self._connections.append(connection)
connection.control = self
self._handler(connection)
+ return peer
connector = connect
@@ -193,6 +237,8 @@
return iter(self._connections)
def close(self, handler=None):
+ if self.address is not None:
+ del _connectable[self.address]
self._handler = None
if handler is None:
while self._connections:
Modified: zc.ngi/trunk/src/zc/ngi/tests.py
===================================================================
--- zc.ngi/trunk/src/zc/ngi/tests.py 2010-06-20 15:53:56 UTC (rev 113715)
+++ zc.ngi/trunk/src/zc/ngi/tests.py 2010-06-20 15:53:58 UTC (rev 113716)
@@ -11,16 +11,21 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""XXX short summary goes here.
-
-$Id$
-"""
-import threading, unittest
-from zope.testing import doctest
+import doctest
+import logging
+import manuel.capture
+import manuel.doctest
+import manuel.testing
+import sys
+import threading
+import unittest
+import zc.ngi.async
+import zc.ngi.generator
import zc.ngi.testing
-import zc.ngi.async
import zc.ngi.wordcount
+zc.ngi.async.start_thread() # Make sure the thread is already running
+
def test_async_cannot_connect():
"""Let's make sure that the connector handles connection failures correctly
@@ -122,12 +127,189 @@
True
>>> s.close()
+ """
+def async_error_in_client_when_conection_is_closed():
+ """
+If a connection is closed, we need to make sure write calls generate errors.
+ >>> logger = logging.getLogger('zc.ngi')
+ >>> log_handler = logging.StreamHandler(sys.stdout)
+ >>> logger.addHandler(log_handler)
+ >>> logger.setLevel(logging.WARNING)
+ >>> server_event = threading.Event()
+ >>> @zc.ngi.generator.handler
+ ... def server(conn):
+ ... data = yield
+ ... print data
+ ... server_event.set()
+ >>> listener = zc.ngi.async.listener(None, server)
+
+ >>> class Connector:
+ ... def __init__(self):
+ ... self.event = threading.Event()
+ ... def connected(self, conn):
+ ... self.conn = conn
+ ... self.event.set()
+
+ >>> connector = Connector()
+ >>> zc.ngi.async.connect(listener.address, connector)
+ >>> connector.event.wait(1)
+
+OK, we've connected. If we close the connection, we won't be able to write:
+
+ >>> connector.conn.close()
+ >>> connector.conn.write('xxx')
+
+ >>> connector.conn.writelines(['xxx', 'yyy'])
+
+Similarly if the server closes the connection:
+
+ >>> connector = Connector()
+ >>> zc.ngi.async.connect(listener.address, connector)
+ >>> connector.event.wait(1)
+
+ >>> connector.conn.write('aaa'); server_event.wait(1)
+ aaa
+
+ >>> connector.conn.write('xxx')
+
+ >>> connector.conn.writelines(['xxx', 'yyy'])
+
+
+ >>> logger.removeHandler(log_handler)
+ >>> logger.setLevel(logging.NOTSET)
+
"""
+def when_a_server_closes_a_connection_blocking_request_returns_reason():
+ """
+
+ >>> import zc.ngi.adapters, zc.ngi.async, zc.ngi.blocking
+ >>> @zc.ngi.adapters.Sized.handler
+ ... def echo1(c):
+ ... c.write((yield))
+
+ >>> listener = zc.ngi.async.listener(None, echo1)
+ >>> @zc.ngi.adapters.Sized.handler
+ ... def client(c):
+ ... c.write('test')
+ ... print '1', (yield)
+ ... print '2', (yield)
+ >>> zc.ngi.blocking.request(zc.ngi.async.connect, listener.address,
+ ... client, 1)
+ ... # doctest: +ELLIPSIS
+ 1...
+ 'end of input'
+ >>> listener.close()
+ """
+
+def errors_raised_by_handler_should_be_propigated_by_blocking_request():
+ """
+ Errors raised by handlers should propigate to the request caller,
+ rather than just getting logged as usual.
+
+ Note that this test also exercises error handling in zc.ngi.async.
+
+ >>> from zc.ngi import async
+ >>> from zc.ngi.adapters import Sized
+ >>> from zc.ngi.blocking import request
+
+ >>> @Sized.handler
+ ... def echo(c):
+ ... while 1:
+ ... data = (yield)
+ ... if data == 'stop': break
+ ... c.write(data)
+
+ >>> listener = async.listener(None, echo)
+
+ Handle error in setup
+
+ >>> @Sized.handler
+ ... def bad(c):
+ ... raise ValueError
+
+ >>> try: request(async.connect, listener.address, bad, 1)
+ ... except ValueError: pass
+ ... else: print 'oops'
+
+ Handle error in input
+
+ >>> @Sized.handler
+ ... def bad(c):
+ ... c.write('test')
+ ... data = (yield)
+ ... raise ValueError
+
+ >>> try: request(async.connect, listener.address, bad, 1)
+ ... except ValueError: pass
+ ... else: print 'oops'
+
+ Handle error in close
+
+ >>> @Sized.handler
+ ... def bad(c):
+ ... c.write('stop')
+ ... try:
+ ... while 1:
+ ... data = (yield)
+ ... except GeneratorExit:
+ ... raise ValueError
+
+ >>> try: request(async.connect, listener.address, bad, 1)
+ ... except ValueError: pass
+ ... else: print 'oops'
+
+ Handle error in handle_exception arising from error during iteration:
+
+ >>> @Sized.handler
+ ... def bad(c):
+ ... c.writelines(XXX for i in range(2))
+ ... data = (yield)
+
+ >>> try: request(async.connect, listener.address, bad, 1)
+ ... except NameError: pass
+ ... else: print 'oops'
+
+ >>> listener.close()
+ """
+
+def async_handling_iteration_errors():
+ """
+
+ >>> from zc.ngi import async
+ >>> from zc.ngi.adapters import Sized
+ >>> from zc.ngi.blocking import request
+
+ >>> @Sized.handler
+ ... def echo(c):
+ ... while 1:
+ ... data = (yield)
+ ... if data == 'stop': break
+ ... c.write(data)
+
+ >>> listener = async.listener(None, echo)
+
+ Handler with no handle_exception but with a handle close.
+
+ >>> event = threading.Event()
+ >>> class Bad:
+ ... def connected(self, connection):
+ ... connection.setHandler(self)
+ ... connection.writelines(XXX for i in range(2))
+ ... def handle_close(self, connection, reason):
+ ... print 'closed', reason
+ ... event.set()
+
+ >>> zc.ngi.async.connect(listener.address, Bad()); event.wait(1)
+ closed Bad instance has no attribute 'handle_exception'
+
+ >>> listener.close()
+ """
+
class BrokenConnect:
connected = failed_connect = __call__ = lambda: xxxxx
@@ -181,6 +363,10 @@
def test_suite():
return unittest.TestSuite([
+ manuel.testing.TestSuite(
+ manuel.capture.Manuel() + manuel.doctest.Manuel(),
+ 'doc/index.txt',
+ ),
doctest.DocFileSuite(
'README.txt',
'testing.test',
More information about the checkins
mailing list