[Checkins] SVN: Sandbox/J1m/zc.zeo/src/zc/zeo/ Added initial apparenty-working ngi-based simple low-level rpc facility.

Jim Fulton jim at zope.com
Tue Jan 1 12:47:22 EST 2008


Log message for revision 82631:
  Added initial apparenty-working ngi-based simple low-level rpc facility.
  

Changed:
  A   Sandbox/J1m/zc.zeo/src/zc/zeo/
  A   Sandbox/J1m/zc.zeo/src/zc/zeo/__init__.py
  A   Sandbox/J1m/zc.zeo/src/zc/zeo/interfaces.py
  A   Sandbox/J1m/zc.zeo/src/zc/zeo/tests.py
  A   Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.py
  A   Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.txt

-=-
Added: Sandbox/J1m/zc.zeo/src/zc/zeo/__init__.py
===================================================================
--- Sandbox/J1m/zc.zeo/src/zc/zeo/__init__.py	                        (rev 0)
+++ Sandbox/J1m/zc.zeo/src/zc/zeo/__init__.py	2008-01-01 17:47:22 UTC (rev 82631)
@@ -0,0 +1,14 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+


Property changes on: Sandbox/J1m/zc.zeo/src/zc/zeo/__init__.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Added: Sandbox/J1m/zc.zeo/src/zc/zeo/interfaces.py
===================================================================
--- Sandbox/J1m/zc.zeo/src/zc/zeo/interfaces.py	                        (rev 0)
+++ Sandbox/J1m/zc.zeo/src/zc/zeo/interfaces.py	2008-01-01 17:47:22 UTC (rev 82631)
@@ -0,0 +1,106 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import zope.interface
+
+class IServerProtocolHandler(zope.interface.Interface):
+
+    def __call__(client_protocol):
+        """Return an IServerHandler for the given conection and protocol.
+        """
+
+class IClientProtocolHandler(zope.interface.Interface):
+
+    def __call__(server_protocol):
+        """Return an IClientHandler for the given conection and protocol.
+
+        The handler.protocol must be less than or equal to the server protocol.
+        """
+
+class IBaseHandler(zope.interface.Interface):
+
+    protocol = zope.interface.Attribute("A string giving the handler protocol")
+
+    def handle_connection(connection):
+        """Begin communication
+
+        This is called once after the connection has been established.
+        """
+
+    def handle_one_way(connection, method, *arguments):
+        """Handle a one-way call
+        """
+
+    def handle_close(connection, reason):
+        """Handle a connection close
+        """
+
+IClientHandler = IBaseHandler
+
+class IServerHandler(IBaseHandler):
+
+    def handle_call(server_connection, message_id, method, *arguments):
+        """Handle a normal (2-way) call
+
+        The return value is ignored. The handler must later call
+        result or exception on the server_connection.
+        """
+
+class IBaseConnection(zope.interface.Interface):
+
+    def one_way(method, *arguments):
+        """Send a one-way message
+        """
+
+    def close():
+        """Close the connection
+        """
+
+class IClientConnection(IBaseConnection):
+
+    def call(method, *arguments):
+        """Call a method on the server, waiting for the result.
+
+        This is a convenience method that returns the result of
+        calling the wait method with the result of calling the request
+        method.
+
+        The separate request and wait methods are mainly useful for
+        testing, although they allow multiple calls to be made
+        simultaniously.
+        """
+
+    def request(method, *arguments):
+        """Start calling a remote method
+
+        A message id is returned. This can be passed to the wait
+        method to wait for a result.
+        """
+
+    def wait(message_id):
+        """Wait for and return a request result.
+        """
+
+class IServerConnection(IBaseConnection):
+
+    def result(message_id, value):
+        """Return the result of a previous call.
+        """
+
+    def exception(message_id, value):
+        """Return an exception value
+        """
+
+
+


Property changes on: Sandbox/J1m/zc.zeo/src/zc/zeo/interfaces.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Added: Sandbox/J1m/zc.zeo/src/zc/zeo/tests.py
===================================================================
--- Sandbox/J1m/zc.zeo/src/zc/zeo/tests.py	                        (rev 0)
+++ Sandbox/J1m/zc.zeo/src/zc/zeo/tests.py	2008-01-01 17:47:22 UTC (rev 82631)
@@ -0,0 +1,21 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import unittest
+from zope.testing import doctest
+
+def test_suite():
+    return unittest.TestSuite((
+        doctest.DocFileSuite('zrpc.txt'),
+        ))
+


Property changes on: Sandbox/J1m/zc.zeo/src/zc/zeo/tests.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Added: Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.py
===================================================================
--- Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.py	                        (rev 0)
+++ Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.py	2008-01-01 17:47:22 UTC (rev 82631)
@@ -0,0 +1,208 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import cPickle
+import cStringIO
+import logging
+import sys
+import threading
+
+import zc.ngi.adapters
+import zope.interface
+
+logger = logging.getLogger(__name__)
+
+class RPCError(Exception):
+    """An error in the RPC protocol
+    """
+
+class BadGlobal(RPCError):
+    """No such module
+    """
+
+class Closed(RPCError):
+    """An attempt was made to send or recieve data on a closed connection.
+    """
+
+class _BaseConnection:
+
+    def __init__(self, connection, protocol_handler):
+        connection = zc.ngi.adapters.Sized(connection)
+        self._connection = connection
+        self._protocol_handler = protocol_handler
+        connection.setHandler(self)
+
+    def handle_input(self, connection, data):
+        try:
+            handler = self._handler
+        except AttributeError:
+            self._handle_protocol(data)
+            self._handler.handle_connection(self)
+            return
+
+        unpickler = cPickle.Unpickler(cStringIO.StringIO(data))
+        unpickler.find_global = self._find_global
+        message_id, one_way, method_name, arguments = unpickler.load()
+        if one_way:
+            handler.handle_one_way(self, method_name, *arguments)
+        else:
+            self._handle_message(message_id, method_name, arguments)
+
+    _closed = False
+    def close(self):
+        self._closed = True
+        self._connection.close()
+
+    def handle_close(self, connection, reason):
+        self._closed = True
+        try:
+            handler = self._handler
+        except AttributeError:
+            pass
+        else:
+            handler.handle_close(self, reason)
+
+    def one_way(self, method, *arguments):
+        if self._closed:
+            raise Closed
+        self._connection.write(cPickle.dumps((0, 1, method, arguments), 1))
+
+class ServerConnection(_BaseConnection):
+    _find_global = None
+
+    def __init__(self, connection, protocol_handler, protocol=None):
+        if protocol is None:
+            protocol = protocol_handler.protocol
+        _BaseConnection.__init__(self, connection, protocol_handler)
+        self._connection.write(protocol)
+
+    def _handle_protocol(self, protocol):
+        self._handler = self._protocol_handler(protocol)
+        
+    def _handle_message(self, message_id, meth, arguments):
+        try:
+            self._handler.handle_call(self, message_id, meth, arguments)
+        except:
+            self.exception(message_id, sys.exc_info()[1])
+
+    def result(self, message_id, value):
+        if self._closed:
+            raise Closed
+        self._connection.write(
+            cPickle.dumps((message_id, 0, '.reply', value), 1)
+            )
+
+    def exception(self, message_id, value):
+        if self._closed:
+            raise Closed
+        value = value.__class__, value
+        self._connection.write(
+            cPickle.dumps((message_id, 0, '.reply', value), 1)
+            )
+
+class ClientConnection(_BaseConnection):
+
+    def __init__(self, connection, protocol_handler):
+        _BaseConnection.__init__(self, connection, protocol_handler)
+        self._replies = {}
+        self._cond = threading.Condition()
+        self._message_id_lock = threading.Lock()
+
+    def _handle_protocol(self, protocol):
+        self._handler = self._protocol_handler(protocol)
+        self._connection.write(self._handler.protocol)
+
+    def _handle_message(self, message_id, meth, arguments):
+        if meth != '.reply':
+            logger.critical("Unexpected cliet call")
+            self.close()
+            self._handler.handle_close("Bad Server")
+        else:
+            cond = self._cond
+            cond.acquire()
+            try:
+                self._replies[message_id] = arguments
+                cond.notifyAll()
+            finally:
+                cond.release()
+
+    def _next_message_id(self):
+        self._lock.acquire()
+        try:
+            self._message_id = message_id = self._message_id + 1
+            return message_id
+        finally:
+            self._lock.release()
+
+    _message_id = 0
+    def request(self, method, *arguments):
+        if self._closed:
+            raise Closed
+        self._message_id_lock.acquire()
+        try:
+            self._message_id = message_id = self._message_id + 1
+        finally:
+            self._message_id_lock.release()
+        self._connection.write(
+            cPickle.dumps((message_id, 0, method, arguments))
+            )
+        return message_id
+
+    def wait(self, message_id):
+        if self._closed:
+            raise Closed
+        cond = self._cond
+        replies = self._replies
+        cond.acquire()
+        try:
+            while 1:
+                try:
+                    if self._closed:
+                        raise Closed
+                    result = replies.pop(message_id)
+                except KeyError:
+                    pass
+                else:
+                    break
+                cond.wait()
+        finally:
+            cond.release()
+
+        if (isinstance(result, tuple)
+            and len(result) == 2
+            and isinstance(result[1], Exception)):
+            raise result[1]
+        return result
+
+    def call(self, meth, *args):
+        return self.wait(self.request(meth, *args))
+
+    def _find_global(self, module_name, name):
+        # Find a global, which must be an exception subclass
+
+        try:
+            module = sys.modules[module_name]
+        except KeyError:
+            raise BadGlobal(module_name, name)
+
+        try:
+            r = getattr(module, name)
+        except AttributeError:
+            raise BadGlobal(module_name, name)
+
+        if issubclass(r, Exception):
+            return r
+
+        raise BadGlobal(module_name, name)
+    


Property changes on: Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Added: Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.txt
===================================================================
--- Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.txt	                        (rev 0)
+++ Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.txt	2008-01-01 17:47:22 UTC (rev 82631)
@@ -0,0 +1,230 @@
+ZEO Remote Procedute Call
+=========================
+
+The zc.zeo.zrpc package provides a very very simple RPC mechanism to
+support Zope Enterprise Objects, the client-server facility for ZODB.
+
+At the lowest level, the protocol consists of sized string
+messages. Each message begins with a 4-byte big-endian message size
+and is followed by the message data.
+
+The protocol begins with protocol negotiation.  The client and
+server each send a protocol identifier.  The client waits for the
+server protocol.  Typically, the client sends the lesser of the client and
+server protocol. (This implies that protocols are ordered.)  If the
+server doesn't support the protocol sent by the client, then it closes
+the connection with the client.
+
+On top of the low-level size-message protocol is the rpc
+protocol. Each message is a pickle of a 4-tuple:
+
+message id
+  Sequential integer message identifier, unique to the client, for
+  two-way calls.
+
+flags
+  This is either 0 for two-way calls or 1 for one-way calls.
+
+method name
+  For replies, this is the special string, ".reply".
+
+arguments
+  The method arguments.
+
+(This structure is a bit silly, but is retained for compatibility with
+old clients.)
+
+The API for the RPC mechanism is fairly low level.  The API builds on
+zc.ngi.  There are a pair of very simple connection adapters,
+zc.zeo.zrpc.ServerConnection and zc.zeo.zrpc.ClientConnection.  The
+differences between these are very minor:
+
+- ServerConnections can only send one-way messages, while
+  ClientConnections can send both one-way and 2-way messages.
+
+- ServerConnections send their protocol immediately.
+  ClientConnections wait to read the protocol sent by the server and
+  then send the lesser of their protocol and the servers.
+
+To implement a server, we provide a server connection handler that
+calls the ServerConnection constructor.  We will pass this an ngi
+connection, a protocol handler and a server protocol.  We can omit the
+server protocol if the protocol handler has a protocol attribute.
+
+We need to create aserver protocol handler and a server handler.
+We'll provide both as a simple class:
+
+    >>> class Server:
+    ...     protocol = 'p42'
+    ...
+    ...     def __init__(self, protocol):
+    ...         print protocol
+    ...         self.protocol = protocol
+    ...         global server_handler
+    ...         server_handler = self
+    ...
+    ...     def handle_connection(self, connection):
+    ...         self.connection = connection
+    ...         connection.one_way('hello', 'client')
+    ...
+    ...     def handle_one_way(self, connection, method, *arguments):
+    ...         print 'Server handle_one_way', method, arguments
+    ...         if method == 'fail':
+    ...             raise ValueError(*arguments)
+    ...
+    ...     def handle_call(self, connection, message_id, method, *arguments):
+    ...         print 'Server handle_call', message_id, method, arguments
+    ...         if method == 'fail':
+    ...             raise ValueError(*arguments)
+    ...
+    ...     def handle_close(self, connection, reason):
+    ...         print 'Server closed', reason
+    
+This is a very trivial class that mostly just prints what it is
+passed. It does send a one-way hello message when it is connected.
+Because this class can "handle" all protocols, we use the class itself
+as the protocol handler. It also saves the handler as a global
+variable so we can access it.  We set the server protocol as the
+protocol attribute of the class so we don't have to pass it to the
+ServerConnection constructor. 
+
+We create an event object that we set whenever we get a message. This
+is purely an aid for this doctest. Normal servers wouldn't do this.
+
+Now, we'll create a testing listener:
+
+    >>> import zc.ngi.testing, zc.zeo.zrpc
+    >>> listener = zc.ngi.testing.listener(
+    ...     lambda connection:
+    ...     zc.zeo.zrpc.ServerConnection(connection, Server)
+    ...     )
+
+We passed the listener a callable that takes a connection and calles
+the ServerConnection constructor.
+
+Defining a client is fairly similar to defining a server:
+
+    >>> class Client:
+    ...     def __init__(self, protocol):
+    ...         print protocol
+    ...         self.protocol = protocol
+    ...         global client_handler
+    ...         client_handler = self
+    ...
+    ...     def handle_connection(self, connection):
+    ...         self.connection = connection
+    ...         connection.one_way('hello', 'server')
+    ...
+    ...     def handle_one_way(self, connection, method, *arguments):
+    ...         print 'Client handle_one_way', method, arguments
+    ...
+    ...     def handle_close(self, connection, reason):
+    ...         print 'Client closed', reason
+
+We need an ngi client connection handler that accepts an ngi
+connection and applies the ClientConnection adapter.
+
+    >>> class ConnectionHandler:
+    ...     def __init__(self, addr, connector):
+    ...         connector(addr, self)
+    ...     def connected(self, connection):
+    ...         self.connection = zc.zeo.zrpc.ClientConnection(
+    ...             connection, Client)
+    ...     def failed_connect(self, reason):
+    ...         print 'connection failed', reason
+
+Now we can try creating a connection:
+
+    >>> _ = ConnectionHandler('', listener.connector)
+    p42
+    p42
+    Server handle_one_way hello ('server',)
+    Client handle_one_way hello ('client',)
+
+We saw one-way calls get sent.  Let's try making a regular
+call. Normally, we'd do this with the call method, but we want to
+split the call activity into requesting and waiting for a reply so we
+can verify the request that was sent and provide a response directly:
+
+Now, let's call the server:
+
+    >>> message_id = client_handler.connection.request(
+    ...     'meth', 'arg1', 'arg2', 3)
+    Server handle_call 1 meth (('arg1', 'arg2', 3),)
+
+    >>> message_id
+    1
+
+Here we see the resulting method and arguments get passed to the
+server.
+
+Now, we'll send a reply:
+
+    >>> server_handler.connection.result(1, 'a result')
+    >>> client_handler.connection.wait(message_id)
+    'a result'
+
+Only basic objects (objects with built-in support in the pickle
+module) can be included as arguments or as results.  Exception
+instances may be returned from server calls as exceptions.
+
+    >>> class C:
+    ...     pass
+
+    >>> client_handler.connection.request('meth', C())
+    ... # doctest: +ELLIPSIS
+    Error test connection calling connection handler:
+    Traceback (most recent call last):
+    ...
+    UnpicklingError: Global and instance pickles are not supported.
+    Client closed closed
+    Server closed handle_input error
+    2
+
+When we try to get a response, we'll get an error indicating that the
+connection was closed:
+
+    >>> client_handler.connection.wait(message_id)
+    Traceback (most recent call last):
+    ...
+    Closed
+
+
+The pickle problem is detected on the server when it tries to unpickle
+the message. We see that the connection was closed due to the server
+error. We need to reconnect:
+
+    >>> _ = ConnectionHandler('', listener.connector)
+    p42
+    p42
+    Server handle_one_way hello ('server',)
+    Client handle_one_way hello ('client',)
+
+Server application exceptions will be raised on the client:
+
+    >>> message_id = client_handler.connection.request('meth', 1, 2)
+    Server handle_call 1 meth ((1, 2),)
+
+    >>> message_id
+    1
+
+    >>> server_handler.connection.exception(message_id, ValueError('eek'))
+    >>> client_handler.connection.wait(message_id)
+    Traceback (most recent call last):
+    ...
+    ValueError: eek
+
+
+    >>> message_id = client_handler.connection.request('fail', 1, 2)
+    Server handle_call 2 fail ((1, 2),)
+
+    >>> message_id
+    2
+
+    >>> client_handler.connection.wait(message_id)
+    Traceback (most recent call last):
+    ...
+    ValueError: (1, 2)
+
+    >>> client_handler.connection.close()
+    Server closed closed


Property changes on: Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.txt
___________________________________________________________________
Name: svn:eol-style
   + native



More information about the Checkins mailing list