[Checkins] SVN: zc.ngi/branches/jim-dev/ checkpoint

Jim Fulton jim at zope.com
Mon Sep 14 15:15:02 EDT 2009


Log message for revision 103973:
  checkpoint
  

Changed:
  U   zc.ngi/branches/jim-dev/src/zc/ngi/adapters.py
  U   zc.ngi/branches/jim-dev/src/zc/ngi/async.py
  U   zc.ngi/branches/jim-dev/src/zc/ngi/blocking.py
  U   zc.ngi/branches/jim-dev/src/zc/ngi/doc/contents.txt
  U   zc.ngi/branches/jim-dev/src/zc/ngi/doc/index.txt
  U   zc.ngi/branches/jim-dev/src/zc/ngi/generator.py
  U   zc.ngi/branches/jim-dev/src/zc/ngi/interfaces.py
  U   zc.ngi/branches/jim-dev/src/zc/ngi/testing.py
  U   zc.ngi/branches/jim-dev/todo.txt

-=-
Modified: zc.ngi/branches/jim-dev/src/zc/ngi/adapters.py
===================================================================
--- zc.ngi/branches/jim-dev/src/zc/ngi/adapters.py	2009-09-14 16:57:40 UTC (rev 103972)
+++ zc.ngi/branches/jim-dev/src/zc/ngi/adapters.py	2009-09-14 19:15:02 UTC (rev 103973)
@@ -17,42 +17,56 @@
 """
 import struct
 
-class Lines:
+class Base(object):
 
     def __init__(self, connection):
         self.connection = connection
-        self.close = connection.close
-        self.write = connection.write
 
+    def close(self):
+        self.connection.close()
+
+    def write(self, data):
+        self.write = self.connection.write
+        self.write(data)
+
+    def writelined(self, data):
+        self.writelined = self.connection.writelined
+        self.writelined(data)
+
     def setHandler(self, handler):
         self.handler = handler
-        self.input = ''
         self.connection.setHandler(self)
 
     def handle_input(self, connection, data):
+        handle_input = self.handler.handle_input
+        self.handle_input(connection, data)
+
+    def handle_close(self, connection, reason):
+        self.handler.handle_close(connection, reason)
+
+    def handle_exception(self, connection, reason):
+        self.handler.handle_exception(connection, reason)
+
+class Lines(Base):
+
+    input = ''
+
+    def handle_input(self, connection, data):
         self.input += data
         data = self.input.split('\n')
         self.input = data.pop()
         for line in data:
             self.handler.handle_input(self, line)
 
-    def handle_close(self, connection, reason):
-        self.handler.handle_close(self, reason)
 
+class Sized(Base):
 
-class Sized:
-
-    def __init__(self, connection):
-        self.connection = connection
-        self.close = connection.close
-
+    want = 4
+    got = 0
+    getting_size = True
     def setHandler(self, handler):
-        self.handler = handler
         self.input = []
-        self.want = 4
-        self.got = 0
-        self.getting_size = True
-        self.connection.setHandler(self)
+        Base.setHandler(self, handler)
 
     def handle_input(self, connection, data):
         self.got += len(data)
@@ -83,8 +97,8 @@
                 self.getting_size = True
                 self.handler.handle_input(self, collected)
 
-    def handle_close(self, connection, reason):
-        self.handler.handle_close(self, reason)
+    def writelines(self, data):
+        self.connection.writelines(sized_iter(data))
 
     def write(self, message):
         if message is None:
@@ -92,4 +106,12 @@
         else:
             self.connection.write(struct.pack(">I", len(message)))
             self.connection.write(message)
-    
+
+def sized_iter(data):
+    for message in data:
+        if message is None:
+            yield '\xff\xff\xff\xff'
+        else:
+            yield struct.pack(">I", len(message))
+            yield message
+

Modified: zc.ngi/branches/jim-dev/src/zc/ngi/async.py
===================================================================
--- zc.ngi/branches/jim-dev/src/zc/ngi/async.py	2009-09-14 16:57:40 UTC (rev 103972)
+++ zc.ngi/branches/jim-dev/src/zc/ngi/async.py	2009-09-14 19:15:02 UTC (rev 103973)
@@ -20,11 +20,9 @@
 import errno
 import logging
 import os
-import select
 import socket
 import sys
 import threading
-import time
 
 import zc.ngi
 
@@ -365,11 +363,31 @@
         try:
             self.set_reuse_addr()
             self.logger.info("listening on %r", self.addr)
-            self.bind(addr)
+            if addr is None:
+                # Try to pick one, primarily for testing
+                import random
+                n = 0
+                while 1:
+                    port = random.randint(10000, 30000)
+                    addr = 'localhost', port
+                    try:
+                        self.bind(addr)
+                    except socket.error:
+                        n += 1
+                        if n > 100:
+                            raise
+                        else:
+                            continue
+                    break
+            else:
+                self.bind(addr)
+
             self.listen(255)
         except socket.error:
             self.close()
             raise
+
+        self.address = addr
         notify_select()
 
     def handle_accept(self):
@@ -410,7 +428,7 @@
     def close(self, handler=None):
         self.accepting = False
         self.del_channel(_map)
-        self.socket.close()
+        call_from_thread(self.socket.close)
         if handler is None:
             for c in list(self.__connections):
                 c.handle_close("stopped")
@@ -419,6 +437,10 @@
         else:
             self.__close_handler = handler
 
+    # convenience method made possible by storaing out address:
+    def connect(self, handler):
+        connect(self.address, handler)
+
 class udp_listener(BaseListener):
 
     logger = logging.getLogger('zc.ngi.async.udpserver')
@@ -448,7 +470,7 @@
 
     def close(self):
         self.del_channel(_map)
-        self.socket.close()
+        call_from_thread(self.socket.close)
 
 # udp uses GIL to get thread-safe socket management
 _udp_socks = {socket.AF_INET: [], socket.AF_UNIX: []}
@@ -472,6 +494,9 @@
 
     logger = logging.getLogger('zc.ngi.async.trigger')
 
+    def __init__(self):
+        self.callbacks = []
+
     def writable(self):
         return 0
 
@@ -483,15 +508,23 @@
         self.close()
 
     def handle_read(self):
+        while self.callbacks:
+            callback = self.callbacks.pop(0)
+            try:
+                callback()
+            except:
+                self.logger.exception('Calling callback')
+
         try:
             self.recv(BUFFER_SIZE)
         except socket.error:
-            return
+            pass
 
 if os.name == 'posix':
 
     class _Trigger(_Triggerbase, asyncore.file_dispatcher):
         def __init__(self):
+            _Triggerbase.__init__(self)
             self.__readfd, self.__writefd = os.pipe()
             asyncore.file_dispatcher.__init__(self, self.__readfd)
 
@@ -516,6 +549,7 @@
 
     class _Trigger(_Triggerbase, asyncore.dispatcher):
         def __init__(self):
+            _Triggerbase.__init__(self)
 
             # Get a pair of connected sockets.  The trigger is the 'w'
             # end of the pair, which is connected to 'r'.  'r' is put
@@ -584,6 +618,10 @@
 
 notify_select = _trigger.pull_trigger
 
+def call_from_thread(func):
+    _trigger.callbacks.append(func)
+    notify_select()
+
 def loop():
     timeout = 30.0
     map = _map
@@ -598,6 +636,7 @@
         try:
             asyncore.poll(timeout, map)
         except:
+            print sys.exc_info()[0]
             logger.exception('loop error')
             raise
 

Modified: zc.ngi/branches/jim-dev/src/zc/ngi/blocking.py
===================================================================
--- zc.ngi/branches/jim-dev/src/zc/ngi/blocking.py	2009-09-14 16:57:40 UTC (rev 103972)
+++ zc.ngi/branches/jim-dev/src/zc/ngi/blocking.py	2009-09-14 19:15:02 UTC (rev 103973)
@@ -56,19 +56,19 @@
         self.connection.setHandler(self)
 
     def handle_input(self, connection, data):
-        self.handle_input = self.handler.hande_input
-        self.hande_input(connection, data)
+        self.handler.handle_input(self, data)
 
     def handle_close(self, connection, reason):
+        handle_close = getattr(self.handler, 'handle_close', None)
+        if handle_close is not None:
+            handle_close(self, reason)
         self.connector.closed = reason
         self.connector.event.set()
 
-    def handle_exception(self, connection, exception):
-        try:
-            self.handler.handle_exception(connection, exception)
-        except:
-            self.connector.exception = exception
-            raise
+    @property
+    def handle_exception(self):
+        handle = self.handler.handle_exception
+        return lambda c, exception: handle(self, exception)
 
 class RequestConnector:
 

Modified: zc.ngi/branches/jim-dev/src/zc/ngi/doc/contents.txt
===================================================================
--- zc.ngi/branches/jim-dev/src/zc/ngi/doc/contents.txt	2009-09-14 16:57:40 UTC (rev 103972)
+++ zc.ngi/branches/jim-dev/src/zc/ngi/doc/contents.txt	2009-09-14 19:15:02 UTC (rev 103973)
@@ -11,6 +11,8 @@
 .. toctree::
    :maxdepth: 2
 
+   index.txt
+
 Indices and tables
 ==================
 

Modified: zc.ngi/branches/jim-dev/src/zc/ngi/doc/index.txt
===================================================================
--- zc.ngi/branches/jim-dev/src/zc/ngi/doc/index.txt	2009-09-14 16:57:40 UTC (rev 103972)
+++ zc.ngi/branches/jim-dev/src/zc/ngi/doc/index.txt	2009-09-14 19:15:02 UTC (rev 103973)
@@ -17,30 +17,27 @@
 using ngi. Implementation interfaces are written by back-end
 implementors.
 
-NGI is primary an asynchronous networking library.  Applications
+NGI is primary an asynchronous event-driven networking library.  Applications
 provide handlers that respond to network events.  The application
 interfaces definee these handlers:
 
 IConnectionHandler
-    Application component that handles TCP network input.
+    Application component that handles TCP network input
 
 IClientConnectHandler
     Application component that handles successful or failed outgoing
-    TCP connections.
+    TCP connections
 
 IServer
-    Application callback to handle incoming connections.
+    Application callback to handle incoming connections
 
 IUDPHandler
-    Application callback to handle incoming UDP messages.
+    Application callback to handle incoming UDP messages
 
-NGI also provides a synchronous API implemented on top of the
-asynchronous API.
-
 The implemention APIs provide (or mimic) low-level networking APIs:
 
 IImplementation
-    APIs for implementing and connecting to TCP servers and for
+    API for implementing and connecting to TCP servers and for
     implementing and sending messages to UDP servers.
 
 IConnection
@@ -74,17 +71,18 @@
 
 There are only 3 methods in the interface, 2 of which are optional.
 Each of the 3 methods takes a connection object, implementing
-``IConnection``.  Typically, connection handlers will call the write,
-writelines, or close methods from the handler's handle input method.
-The writelines [#writelines] method takes an iteraable object.
+``IConnection``.  Typically, connection handlers will call the
+``write``, ``writelines``, or ``close`` methods from the handler's
+``handle_input`` method.  The ``writelines`` [#writelines] method
+takes an iteraable object.
 
-The handler's handle_close and handle_exception methods are optional.
-The handle_exception method is only called if an iterator created from
+The handler's ``handle_close`` and ``handle_exception`` methods are optional.
+The ``handle_exception`` method is only called if an iterator created from
 an iterable passed to writelines raises an exception.  If a call to
-handle_exception fails, an implementation will close the connection.
+``handle_exception`` fails, an implementation will close the connection.
 
-The handle_close method is called when a connection is closed other
-than through the connection handler calling the connection's close
+The ``handle_close`` method is called when a connection is closed other
+than through the connection handler calling the connection's ``close``
 method.  For many applications, this is uninteresting, which is why
 the method is optional.  Clients that maintain long-running
 conections, may try to create new connections when notified that a
@@ -94,9 +92,9 @@
 ---------------------------
 
 Testing a connection handler is very easy.  Just call it's methods
-passing suitable arguments. The zc.ngi.testing module provides a
+passing suitable arguments. The ``zc.ngi.testing`` module provides a
 connection implementation designed to make testing convenient.  For
-example, to test our Echo connection handler, we can use code like the
+example, to test our ``Echo`` connection handler, we can use code like the
 following:
 
     >>> import zc.ngi.testing
@@ -115,11 +113,11 @@
 ------------------------------------
 
 Let's look at a slightly more complicated example.  We'll implement
-simple word-count server connection handler that implements something
-akin to the Unix word-count command.  It takes a line of input
+a simple word-count server connection handler that implements something
+akin to the Unix ``wc`` command.  It takes a line of input
 containing a text length followed by length bytes of data.  After
 recieving the length bytes of data, it send back a line of data
-containing line, word, and character counts::
+containing line and word counts::
 
   class WC:
 
@@ -142,9 +140,7 @@
           self.input = self.input[self.count:]
           self.count = None
           connection.write(
-              '%d %d %d\n' % (
-                 len(data.split('\n')), len(data.split()), len(data)
-                 ))
+              '%d %d\n' % (len(data.split('\n')), len(data.split())))
 
 .. -> src
 
@@ -154,19 +150,19 @@
     >>> connection = zc.ngi.testing.Connection()
     >>> handler.handle_input(connection, '15')
     >>> handler.handle_input(connection, '\nhello out\nthere')
-    -> '2 3 15\n'
+    -> '2 3\n'
 
 Here, we ommitted the optional handle_close and handle_exception
 methods.  The implementation is a bit complicated. We have to use
 instance variables to keep track of state between calls.  Note that we
 can't count on data coming in a line at a time or make any assumptions
-about the amount of data we'll recieve in a handle_input call.  The
-logic is complicated by the fact that we have two modes of collecting
-input. In the first mode, we're collecting a length. In the second
-mode, we're collecting input for analysis.
+about the amount of data we'll recieve in a ``handle_input`` call.
+The logic is further complicated by the fact that we have two modes of
+collecting input. In the first mode, we're collecting a length. In the
+second mode, we're collecting input for analysis.
 
 Connection handlers can often be simplified by writing them as
-generators, using zc.ngi.generator.handler::
+generators, using the ``zc.ngi.generator.handler`` decorator::
 
     import zc.ngi.generator
 
@@ -182,9 +178,7 @@
                 input += (yield)
             data = input[:count]
             connection.write(
-                '%d %d %d\n' % (
-                   len(data.split('\n')), len(data.split()), len(data)
-                   ))
+                '%d %d\n' % (len(data.split('\n')), len(data.split())))
             input = input[count:]
 
 .. -> src
@@ -196,10 +190,10 @@
     ...     def wc(conection):
     ...         connection.setHandler(WC())
 
-The generator takes a connection object and gets data via yield
+The generator takes a connection object and gets data via ``yield``
 statements.  The yield statements can raise exceptions.  In
-particular, a GeneratorExit exception is raised when the connection is
-closed.  The yield statement will also (re)raise any exceptions raised
+particular, a ``GeneratorExit`` exception is raised when the connection is
+closed.  The ``yield`` statement will also (re)raise any exceptions raised
 by calling an iterator created from an iterable passed to writelines.
 
 A generator-based handler is instantiated by calling it with a
@@ -208,7 +202,7 @@
     >>> handler = wc(connection)
     >>> handler.handle_input(connection, '15')
     >>> handler.handle_input(connection, '\nhello out\nthere')
-    -> '2 3 15\n'
+    -> '2 3\n'
 
     >>> handler.handle_close(connection, 'done')
 
@@ -217,9 +211,8 @@
 
 Implementing servers is only slightly more involved that implementing
 connection handlers.  A server is just a callable that takes a
-connection.  It typically creates a connection handler and passes it
-to the connection's setHandler method.  We can create a server using
-the Echo conection handler::
+connection and gives it a handler.  For example, we can use a simple
+function to implement a server for the Echo handler::
 
     def echo_server(connection):
         connection.setHandler(Echo())
@@ -228,9 +221,8 @@
 
     >>> exec(src)
 
-Of course, it's simpler to just use a connection handler class as a
+It's usually simpler to just use a connection handler class as a
 server by calling setHandler in the constructor::
-The full echo server is::
 
   class Echo:
 
@@ -250,27 +242,49 @@
 
     >>> exec(src)
 
-Note that handlers created from generators can be used as servers
-directly.
+In this case, the class is a server. It's instances are connection
+handlers.
 
+Handlers created from generators can be used as servers directly.
+
+Listening for connections
+-------------------------
+
 Finally, we have to listen for connections on an address by calling an
-implementation's listener method.  NGI comes with 2 implementations,
-an impplementation based on the standard asyncore module,
-zc.ngi.async, and a testing implementation, zc.ngi.testing.  To
-listen for real network connections on localhost port 8000, we'd use::
+implementation's ``istener`` method.  NGI comes with 2 implementations
+[#twistedimplementations]_, an implementation based on the ``asyncore``
+module from the standard library, ``zc.ngi.async``, and a testing
+implementation, ``zc.ngi.testing``.  To listen for network
+connections on ``localhost`` port ``8000```, we'd use::
 
     >>> import zc.ngi.async
 
     >>> address = 'localhost', 8000
-    >>> listener = zc.ngi.async.listener(address. Echo)
 
+.. let the listener pick the address
+
+    >>> address = None
+
+::
+
+    >>> listener = zc.ngi.async.listener(address, Echo)
+
 .. test it
 
-    >>> exec(src)
+    >>> import logging, sys
+    >>> loghandler = logging.StreamHandler(sys.stdout)
+    >>> logging.getLogger('zc.ngi').addHandler(loghandler)
+    >>> logging.getLogger('zc.ngi').setLevel(logging.ERROR)
+
+    Echo's handle_close is problematic when using async, dur to timing
+    uncertainty.
+
+    >>> Echo.handle_close = lambda *args: None
+
     >>> class EC:
     ...     def connected(self, connection):
     ...         connection.setHandler(self)
-    ...         conection.write('test data')
+    ...         connection.write('test data')
     ...     input = ''
     ...     def handle_input(self, connection, data):
     ...         self.input += data
@@ -278,14 +292,24 @@
     ...             print self.input
     ...             connection.close()
 
-    >>> import zc.ngi.blocking
-    >>> zc.ngi.blocking.request(zc.ngi.async.connect, address, EC)
+    >>> import zc.ngi.blocking, time
+    >>> address = listener.address
 
-The listener call returns immediately.  The servicing of requests is
-done in a separate daemon thread provided by ``zc.ngi.async``.
+    We need the time.sleep call to give the server time to 
+    get its connection closed.
 
-Listener objects, returned from an impementation's listener function,
-provide methods for controlling servers.  The connections method
+    >>> zc.ngi.blocking.request(
+    ...     zc.ngi.async.connect, address, EC(), 3); time.sleep(.1)
+    TEST DATA
+    'client'
+
+
+The listener call immediately returns a listener object.  The
+servicing of requests is done in a separate daemon thread provided by
+``zc.ngi.async``.
+
+Listener objects, returned from an impementation's ``listener`` method,
+provide methods for controlling listeners.  The connections method
 returns an iterable of open connections to a server:
 
     >>> list(listener.connections())
@@ -297,23 +321,31 @@
 
 .. test it
 
+    >>> time.sleep(.1)
     >>> zc.ngi.blocking.request(zc.ngi.async.connect, address, EC)
+    ... # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    ConnectionFailed: ...
 
-There's also a ``close_wait`` method that stops listening and waits
-for a given period of time for clients to finish on their own before
-closing them.
+.. XXX Future
 
+  There's also a ``close_wait`` method that stops listening and waits
+  for a given period of time for clients to finish on their own before
+  closing them.
+
 NGI doesn't keep the main thread running
 ----------------------------------------
 
 An important thing to note about NGI is that it doesn't provide
 support for maintaining the main application thread. The threads it
 creates for itself are "daemon" threads, meaning they don't keep an
-application running.  If a main program ended with an implentation's
-listener call. the program would likely exit before the listener had a
-chance to get and service any connections.
+application running when the main thread exists.  If a main program
+ended with an implentation's listener call. the program would likely
+exit before the listener had a chance to get and service any
+connections.
 
-It's up to us to keep an application running. Some frameworks provide
+It's up to you to keep an application running. Some frameworks provide
 a ``loop_forever`` call. The closest thing in NGI is::
 
     import threading
@@ -323,7 +355,7 @@
 If you wanted to provide a way to gracefully shut down an application,
 you'd provide some communication channel, such as a signnal handler,
 that closed any listeners and then set the event blocking the main
-thread from executing.
+thread from exiting.
 
 Testing servers
 ---------------
@@ -355,6 +387,8 @@
 
     >>> connection.peer.peer is connection
     True
+    >>> list(listener.connections()) == [connection.peer]
+    True
 
 The test connection has a default handler that just prints data to
 standard output, but we can call setHandler on it to use a different
@@ -367,19 +401,27 @@
     >>> connection.write('take this')
     got 'TAKE THIS'
 
+Now, the data sent back from the server is handled by our custom
+handler, rather than the default one.
+
+.. cleanup
+
+    >>> listener.close()
+
 Implementing clients
 ====================
 
-Implementing clients is a little bit more involved than writing
+Implementing clients is a little bit more involved than implementing
 servers because in addition to handling connections, you have to
 initiate the connections in the first place.  This involves
 implementing client connect handlers.  You request a connection by
-calling an implementation's ``connect`` function, passing a connect
-handler.  The ``connected`` method is called if the connection suceeds
-and the ``failed_connect`` method is called if it fails.
+calling an implementation's ``connect`` function, passing an address
+and a connect handler.  The handler's ``connected`` method is called
+if the connection suceeds and the handler's ``failed_connect`` method
+is called if it fails.
 
 Let's implement a word-count client.  It will take a string and use a
-work-count server to get it's line, word, and character counts::
+work-count server to get it's line and word counts::
 
   class WCClient:
 
@@ -436,7 +478,7 @@
 Conbining connect handlers with connection handlers
 ---------------------------------------------------
 
-A connect handler can be it's own connection handler:
+A connect handler can be it's own connection handler::
 
   class WCClient:
 
@@ -445,7 +487,7 @@
 
       def connected(self, connection):
           connection.setHandler(self)
-          connection.write(self.data)
+          connection.write("%s\n%s" % (len(self.data), self.data))
 
       def failed_connect(self, reason):
           print 'failed', reason
@@ -464,14 +506,14 @@
     >>> wcc = WCClient('Line one\nline two')
     >>> connection = zc.ngi.testing.Connection()
     >>> wcc.connected(connection)
-    -> 'Line one\nline two'
+    -> '17\nLine one\nline two'
 
     >>> connection.peer.write('more text from server\n')
     WCClient got more text from server
     <BLANKLINE>
     -> CLOSE
 
-and, of course, a generator can be used in the connected method:
+and, of course, a generator can be used in the connected method::
 
   class WCClientG:
 
@@ -480,7 +522,7 @@
 
       @zc.ngi.generator.handler
       def connected(self, connection):
-          connection.write(self.data)
+          connection.write("%s\n%s" % (len(self.data), self.data))
           input = ''
           while '\n' not in input:
               input += (yield)
@@ -493,35 +535,179 @@
 
     >>> if sys.version_info >= (2, 5):
     ...     exec(src)
-    ...     wcc = WCClientG('first one\nsecond one')
-    ...     connection = zc.ngi.testing.Connection()
-    ...     _ = wcc.connected(connection)
-    ...     connection.peer.write('still more text from server\n')
-    -> 'first one\nsecond one'
+    ... else:
+    ...     WCClientG = WCClient
+
+    >>> wcc = WCClientG('first one\nsecond one')
+    >>> connection = zc.ngi.testing.Connection()
+    >>> _ = wcc.connected(connection)
+    -> '20\nfirst one\nsecond one'
+
+    >>> connection.peer.write('still more text from server\n')
     Got still more text from server
     <BLANKLINE>
     -> CLOSE
 
-Conecting
----------
+Connecting
+----------
 
 Implementations provide a ``connect`` method that takes an address and
-connect handler.  We'll often refer to the ``connect`` method as a
-"connector".  Applications that maintain long-running connections will
-often need to reconnect when conections are lost or retry cnectins
-when they fail.  In situations like this, we'll often pass a connect
-function to the application.
+connect handler.
 
+Let's put everything together and connect our sever and client
+implementations.  First, we'll do this with the testing
+implementation:
+
+    >>> listener = zc.ngi.testing.listener(address, wc)
+    >>> zc.ngi.testing.connect(address, WCClient('hi\nout there'))
+    WCClient got 2 3
+    <BLANKLINE>
+
+    >>> listener.close()
+
+The ``testing`` ``listener`` method not only creates a listener, but also
+makes in available for connecting with the ``connect`` method.
+
+We'll see the same behavior with the ``zc.ngi.async`` implementation:
+
+.. let the listener pick an address:
+
+    >>> address = None
+
+::
+
+    >>> listener = zc.ngi.async.listener(address, wc)
+
+.. use the listener's address
+
+    >>> address = listener.address
+
+::
+
+    >>> import time
+    >>> zc.ngi.async.connect(address, WCClient('hi out\nthere')); time.sleep(.1)
+    WCClient got 2 3
+    <BLANKLINE>
+
+    >>> listener.close()
+
+Note that we use the time.sleep call above to wait for the connection
+to happen and run it's course.  This is needed for the ``async``
+because we're using real sockets and threads and there may be some
+small delay between when we rerquest the connection and when it
+happens. This isn't a problem with the testing implementation because
+the connection suceeds or fails right away and the implementation
+doesn't use a separate thread.
+
+We'll often refer to the ``connect`` method as a "connector".
+Applications that maintain long-running connections will often need to
+reconnect when conections are lost or retry cnectins when they fail.
+In situations like this, we'll often pass a connector to the
+application so that it can reconnect or retry a connection when
+needed.
+
+Testing connection logic
+------------------------
+
 When testing application connection logic, you'll typically create
-your own connector object.
+your own connector object. This is especially important if
+applications reconnect when a connextion is lost or fails.  Let's look
+at an example.  Here's a cient application that does nothing but try
+to stay connected::
 
+    class Stay:
+
+        def __init__(self, address, connector):
+            self.address = address
+            self.connector = connector
+            self.connector(self.address, self)
+
+        def connected(self, connection):
+            connection.setHandler(self)
+
+        def failed_connect(self, reason):
+            print 'failed connect', reason
+            self.connector(self.address, self)
+
+        def handle_input(self, connection, data):
+            print 'got', repr(data)
+
+        def handle_close(self, connection, reason):
+            print 'closed', reason
+            self.connector(self.address, self)
+
+.. -> src
+
+    >>> exec(src)
+
+To try this out, we'll create a trivial connector that just remembers
+the attempt::
+
+    def connector(addr, handler):
+        print 'connect request', addr, handler.__class__.__name__
+        global connect_handler
+        connect_handler = handler
+
+.. -> src
+
+    >>> exec(src)
+
+Now, if we create a Stay instance, it will call the connector passed
+to it:
+
+    >>> handler = Stay(('', 8000), connector)
+    connect request ('', 8000) Stay
+
+    >>> connect_handler is handler
+    True
+
+If the connection fails, the ``Stay`` handler will try it again:
+
+    >>> handler.failed_connect('test')
+    failed connect test
+    connect request ('', 8000) Stay
+
+    >>> connect_handler is handler
+    True
+
+If it suceeds and then is closed, the ``Stay`` connection handler will
+reconnect:
+
+    >>> connection = zc.ngi.testing.Connection()
+    >>> handler.connected(connection)
+    >>> connection.handler is handler
+    True
+
+    >>> connect_handler = None
+    >>> handler.handle_close(connection, 'test')
+    closed test
+    connect request ('', 8000) Stay
+
+    >>> connect_handler is handler
+    True
+
+The ``zc.ngi.testing`` module provides a test connector. If a listener
+is registered, then connections to it will succeed, otherwise they
+will fail.  It will raise an exception if it's called in response to a
+failed_connect call to prevent infitite loops:
+
+    >>> _ = Stay(('', 8000), zc.ngi.testing.connect)
+    failed connect no such server
+    For address, ('', 8000), a conenct handler called connect from a
+    failed_connect call.
+
+Connectors return immediately
+-----------------------------
+
 An important thing to note about making connections is that connector
 calls return immediately.  Connections are made and connection
 handlers are called in separate threads.  This means that you can have
-many outstading connect requests active at once.
+many outstading connect requests active at once.  It also means that,
+as with servers, it is your responsibility to keep client programs
+runing while handlers are doing their work.
 
-Blocking API
-============
+Blocking Client Requests
+------------------------
 
 Event-based API's can be very convenient when implementing servers,
 and sometimes even when implementing clients.  In many cases though,
@@ -533,43 +719,231 @@
 before exiting.
 
 To support the common use case of a client that makes a single request
-(or finite number of requests) to a server, the ``zc.ngi.blocking``
+(or small finite number of requests) to a server, the ``zc.ngi.blocking``
 module provides a ``request`` function that makes a single request and
 blocks until the request has completed::
 
     >>> import zc.ngi.blocking
-    >>> zc.ngi.blocking.request(zc.ngi.testing.connect, 'xxx', WCClient)
+    >>> zc.ngi.blocking.request(zc.ngi.async.connect, address, WCClient)
+    ... # doctest: +ELLIPSIS
     Traceback (most recent call last):
     ...
-    ConnectionFailed: no such server
+    ConnectionFailed: ...
 
 The request function takes a connector, an address, and a connect
-handler. In the example above, we used the ``zc.ngi.testing``
-implementation's ``connect`` function as the connector.  The testing
-connector accepts any hashable object as an address.  By default,
-connections using the testing connector fail right away, as we saw
-above.
+handler. In the example above, we used the ``zc.ngi.async``
+implementation's ``connect`` function as the connector. The connection
+above failed because there wasn't a listener.  Let's try after
+starting a listener:
 
+.. let the listener pick the address:
 
+    >>> address = None
 
-request
-threading
-udp
-adapters
+::
+
+    >>> listener = zc.ngi.async.listener(address, wc)
+
+.. Use the picked address
+
+    >>> address = listener.address
+
+::
+
+    >>> zc.ngi.blocking.request(zc.ngi.async.connect, address, WCClient('xxx'))
+    WCClient got 1 1
+    <BLANKLINE>
+    'client'
+
+    >>> listener.close()
+
+The ``zc.ngi.blocking`` module has some other APIs for writing
+blocking network programs in an imperative style.  These were written
+before ``zc.ngi.generator`` and ``zc.ngi.blocking.request``. Now
+``zc.ngi.generator`` allows handlers to be written in an imperitive
+style without giving up the advantages, especially for testing, of
+reactive handlers.  The ``zc.ngi.blocking.request`` function now
+makes it easy for simple client pograms to wait for requests to
+complete.  For these reasons, the older blocking APIs are now
+deprecated.
+
+Connection Adapters
+===================
+
+Often, connection handlers have 2 functions:
+
+- Parse incoming data into messages according to some low-level
+  protocol.
+- Act on incoming messages to perform some application function.
+
+Examples of low-level protocols include line-oriented protocols where
+messages are line terminated, and sized-message protocols, where
+messages are preceded by message sizes.  The word-count example above
+used a sized-message protocol.  A common pattern in NGI is to separate
+low-level protocol handling into a separate component using a
+connection adapter.  When we get a connection, we wrap it with an
+adapter to perform the low-level processing.  Here's an adapter that
+deals with the handling of sized messages for the wordcount example::
+
+    class Sized:
+
+        def __init__(self, connection):
+            self.input = ''
+            self.handler = self.count = None
+            self.connection = connection
+            self.close = connection.close
+            self.write = connection.write
+            self.writelines = connection.writelines
+
+        def setHandler(self, handler):
+            self.handler = handler
+            if hasattr(handler, 'handle_close'):
+                self.handle_close = handler.handle_close
+            if hasattr(handler, 'handle_exception'):
+                self.handle_exception = handler.handle_exception
+            self.connection.setHandler(self)
+
+        def handle_input(self, connection, data):
+            self.input += data
+            if self.count is None:
+                if '\n' not in self.input:
+                    return
+                count, self.input = self.input.split('\n', 1)
+                self.count = int(count)
+            if len(self.input) < self.count:
+                return
+            data = self.input[:self.count]
+            self.input = self.input[self.count:]
+            self.handler.handle_input(self, data)
+
+.. -> src
+
+    >>> exec(src)
+
+With this adapter, we can now write a much simpler version of the
+word-count server:
+
+  class WCAdapted:
+
+      def __init__(self, connection):
+          Sized(connection).setHandler(self)
+
+      def handle_input(self, connection, data):
+          connection.write(
+              '%d %d\n' % (len(data.split('\n')), len(data.split())))
+
+
+.. -> src
+
+    >>> exec(src)
+
+    >>> listener = zc.ngi.testing.listener(WCAdapted)
+    >>> connection = listener.connect()
+    >>> connection.write('15')
+    >>> connection.write('\nhello out\nthere')
+    -> '2 3\n'
+
+    >>> listener.close()
+
+We can also use adapters with generator-based handlers by passing an
+adapter factory to ``zc.ngi.generator.handler`` using the
+``connection_adapter`` keyword argument. Here's the generator version of ther
+word count server using an adapter::
+
+    @zc.ngi.generator.handler(connection_adapter=Sized)
+    def wcadapted(connection):
+        while 1:
+            data = (yield)
+            connection.write(
+                '%d %d\n' % (len(data.split('\n')), len(data.split())))
+
+.. -> src
+
+    >>> if sys.version_info >= (2, 5):
+    ...     exec(src)
+    ...     listener = zc.ngi.testing.listener(wcadapted)
+    ...     connection = listener.connect()
+    ...     connection.write('15')
+    ...     connection.write('\nhello out\nthere')
+    ...     listener.close()
+    -> '2 3\n'
+
+By separating the low-level protocol handling from the application
+logic, we can reuse the low-level protocol in other applications, and
+we can use other low-level protocol with our word-count application.
+
+The ``zc.ngi.adapters`` module provides 2 connection adapters:
+
+``Lines``
+     The ``Lines`` adapter splits input data into records terminated
+     new-line characters.  Records are passed to applications without
+     the terminnating new-line characters.
+
+``Sized``
+     The ``Sized`` connection adapter support sized input and output
+     records.  Each record is preceeded by a 4-byte big-endian record
+     size.  Application's handle_input methods are called with
+     complete records, with the size prefix removed. The adapted
+     connection ``write`` (or ``writelines``) methods take records (or
+     record iterators) and prepend record sizes.
+
+UDP
+===
+
+The NGI also supports UDP networking.  Applications can send UDP
+messages by calling an implementation's ``udp`` method:
+
+    >>> zc.ngi.testing.udp(('', 8000), 'hello udp')
+
+If there isn't a udp listener registered, then nothing will happen.
+
+You can also listen for udp requests by registering a callable with an
+implementation's ``udp_listener``:
+
+    >>> def handle(addr, s):
+    ...     print 'got udp', s, 'from address', addr
+    >>> listener = zc.ngi.testing.udp_listener(('', 8000), handle)
+    >>> zc.ngi.testing.udp(('', 8000), 'hello udp')
+    got udp hello udp from address <test>
+
+    >>> listener.close()
+    >>> zc.ngi.testing.udp(('', 8000), 'hello udp')
+
+Threading
+=========
+
+NGI tries ro accomodate threaded applications without imposing
+thread-safety requirements.
+
+- Implementation (IImplementation) methods ``connect``, ``listener``,
+  ``udp`` and ``udp_listener`` are thread safe. They may be called at
+  any time by any thread.
+
+- Connection (IConnection) methods ``write``, ``writelines``, and
+  ``close`` are  thread safe. They may be called at
+  any time by any thread.
+
+  The connection setHandler method must only be called in a connect
+  handler's ``connected`` method or a connection handler's
+  ``handle_input`` method.
+
+- Application handler methods need not be thread safe.  NGI
+  implementations will never call them from more than one thread at a
+  time.
+
 ----------------------
 
-Notes:
+.. [#twisted] The Twisted networking framework also provides this
+   separation. Twisted doesn't leverage this separation to provide a clean
+   testing invironment as NGI does, although it's likely that it will
+   in the future.
 
-- Maybe close should change to wait until data are sent
-- Maybe grow a close_now. Or some such. Or maybe grow a close_after_sent.
-- What about errors raised by handle_input?
-- Need to make sure we have tests of edge cases where there are errors
-  calling handler methods.
-- testing.listener doesn't use the address argument
+.. [#twistedimplementations] A number of implementations based on
+   Twisted are planned, including a basic Twisted implementation and
+   an implementation using ``twisted.conch`` that will support
+   communication over ssh channels.
 
-- Can we implement application connection retry logic wo threads?
-  Should we? Testing would be easier if the implementation provided
-  it. If conectors took a delay argument, then it would be easier to test.
-- exception hamdling needs more thought
-  - what exceptions get reported
-  - where?
+.. cleanup
+
+    >>> logging.getLogger('zc.ngi').removeHandler(loghandler)
+    >>> logging.getLogger('zc.ngi').setLevel(logging.NOTSET)

Modified: zc.ngi/branches/jim-dev/src/zc/ngi/generator.py
===================================================================
--- zc.ngi/branches/jim-dev/src/zc/ngi/generator.py	2009-09-14 16:57:40 UTC (rev 103972)
+++ zc.ngi/branches/jim-dev/src/zc/ngi/generator.py	2009-09-14 19:15:02 UTC (rev 103973)
@@ -12,18 +12,34 @@
 #
 ##############################################################################
 
-class handler(object):
 
-    def __init__(self, func):
+def handler(func=None, connection_adapter=None):
+    if func is None:
+        return lambda func: Handler(func, connection_adapter)
+    return Handler(func, connection_adapter)
+
+class Handler(object):
+
+    def __init__(self, func, connection_adapter):
         self.func = func
+        self.connection_adapter = connection_adapter
 
     def __call__(self, *args):
+        if self.connection_adapter is not None:
+            args = args[:-1]+(self.connection_adapter(args[-1]), )
         return ConnectionHandler(self.func(*args), args[-1])
 
     def __get__(self, inst, class_):
         if inst is None:
             return self
 
+        if self.connection_adapter is not None:
+            def connected(connection):
+                connection = self.connection_adapter(connection)
+                return ConnectionHandler(self.func(inst, connection),
+                                         connection)
+            return connected
+
         return (lambda connection:
                 ConnectionHandler(self.func(inst, connection), connection)
                 )

Modified: zc.ngi/branches/jim-dev/src/zc/ngi/interfaces.py
===================================================================
--- zc.ngi/branches/jim-dev/src/zc/ngi/interfaces.py	2009-09-14 16:57:40 UTC (rev 103972)
+++ zc.ngi/branches/jim-dev/src/zc/ngi/interfaces.py	2009-09-14 19:15:02 UTC (rev 103973)
@@ -69,6 +69,9 @@
         connected method will be called with an IConnection object
         if and when the connection succeeds or failed_connect method
         will be called if the connection fails.
+
+        This method os thread safe. It may be called by any thread at
+        any time.
         """
 
     def listener(address, handler):
@@ -77,18 +80,27 @@
         When a connection is received, call the handler.
 
         An IListener object is returned.
+
+        This method os thread safe. It may be called by any thread at
+        any time.
         """
 
     def udp(address, message):
         """Send a UDP message
+
+        This method is thread safe. It may be called by any thread at
+        any time.
         """
 
-    def udp_listen(address, handler, buffer_size=4096):
+    def udp_listener(address, handler, buffer_size=4096):
         """Listen for incoming UDP messages
 
         When a message is received, call the handler with the message.
 
         An IUDPListener object is returned.
+
+        This method os thread safe. It may be called by any thread at
+        any time.
         """
 
 class IConnection(Interface):
@@ -124,6 +136,9 @@
         """Output a string to the connection.
 
         The write call is non-blocking.
+
+        This method os thread safe. It may be called by any thread at
+        any time.
         """
 
     def writelines(data):
@@ -131,10 +146,16 @@
 
         The writelines call is non-blocking. Note, that the data may
         not have been consumed when the method returns.
+
+        This method os thread safe. It may be called by any thread at
+        any time.
         """
 
     def close():
         """Close the connection
+
+        This method os thread safe. It may be called by any thread at
+        any time.
         """
 
 class IServerConnection(IConnection):
@@ -237,6 +258,8 @@
     This is an implementation interface.
     """
 
+    address = Attribute("The address the listener is listening on.")
+
     def connections():
         """return an iterable of the current connections
         """

Modified: zc.ngi/branches/jim-dev/src/zc/ngi/testing.py
===================================================================
--- zc.ngi/branches/jim-dev/src/zc/ngi/testing.py	2009-09-14 16:57:40 UTC (rev 103972)
+++ zc.ngi/branches/jim-dev/src/zc/ngi/testing.py	2009-09-14 19:15:02 UTC (rev 103973)
@@ -67,8 +67,25 @@
                 method, args = self.queue.pop(0)
                 if self.closed and method != 'handle_close':
                     break
+
                 try:
-                    getattr(self.handler, method)(self, *args)
+                    try:
+                        handler = getattr(self.handler, method)
+                    except AttributeError:
+                        if method == 'handle_close':
+                            return # Optional method
+                        elif method == 'handle_exception':
+                            # Unhandled exception
+                            self.close()
+                            handler = getattr(self.handler, 'handle_close',
+                                              None)
+                            if handler is None:
+                                return
+                            args = self, 'unhandled exception'
+                        else:
+                            raise
+
+                    handler(self, *args)
                 except:
                     print "Error test connection calling connection handler:"
                     traceback.print_exc(file=sys.stdout)
@@ -156,14 +173,29 @@
         Connection.__init__(self, peer, handler)
 
 _connectable = {}
-
+_recursing = object()
 def connect(addr, handler):
     connections = _connectable.get(addr)
-    if connections:
-        handler.connected(connections.pop(0))
-    else:
-        handler.failed_connect('no such server')
+    if isinstance(connections, list):
+        if connections:
+            return handler.connected(connections.pop(0))
+    elif isinstance(connections, listener):
+        return handler.connected(connections.connect())
+    elif connections is _recursing:
+        print (
+            "For address, %r, a conenct handler called connect from a\n"
+            "failed_connect call."
+            % (addr, ))
+        del _connectable[addr]
+        return
 
+    _connectable[addr] = _recursing
+    handler.failed_connect('no such server')
+    try:
+        del _connectable[addr]
+    except KeyError:
+        pass
+
 connector = connect
 
 def connectable(addr, connection):
@@ -174,6 +206,10 @@
     def __init__(self, addr, handler=None):
         if handler is None:
             handler = addr
+            addr = None
+        else:
+            _connectable[addr] = self
+        self.address = addr
         self._handler = handler
         self._close_handler = None
         self._connections = []
@@ -201,6 +237,8 @@
         return iter(self._connections)
 
     def close(self, handler=None):
+        if self.address is not None:
+            del _connectable[self.address]
         self._handler = None
         if handler is None:
             while self._connections:

Modified: zc.ngi/branches/jim-dev/todo.txt
===================================================================
--- zc.ngi/branches/jim-dev/todo.txt	2009-09-14 16:57:40 UTC (rev 103972)
+++ zc.ngi/branches/jim-dev/todo.txt	2009-09-14 19:15:02 UTC (rev 103973)
@@ -1,12 +1,12 @@
-- Test for blocking where there is some i/o before timeout.
 
-- Test for blocking where there is some i/o before success.
 
-- Test blocking timout on writelines and non-blocking option
 
-- Test handle_exception for blocking
+Notes/to-do:
 
-- Test we fail early when passing non-iterable to writelines
+- deal with testing addresses for async
 
-- Document how to close blocking connections (call input.close(), go
-  figure)
+- testing.listener doesn't use the address argument.
+  should be possible to set up a testing listener that a client can connect
+  to.
+
+- fix method nameing, deprecating camel-case methods



More information about the checkins mailing list