[Checkins] SVN: Sandbox/J1m/zc.zeo/src/zc/zeo/ Added initial
apparenty-working ngi-based simple low-level rpc facility.
Jim Fulton
jim at zope.com
Tue Jan 1 12:47:22 EST 2008
Log message for revision 82631:
Added initial apparenty-working ngi-based simple low-level rpc facility.
Changed:
A Sandbox/J1m/zc.zeo/src/zc/zeo/
A Sandbox/J1m/zc.zeo/src/zc/zeo/__init__.py
A Sandbox/J1m/zc.zeo/src/zc/zeo/interfaces.py
A Sandbox/J1m/zc.zeo/src/zc/zeo/tests.py
A Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.py
A Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.txt
-=-
Added: Sandbox/J1m/zc.zeo/src/zc/zeo/__init__.py
===================================================================
--- Sandbox/J1m/zc.zeo/src/zc/zeo/__init__.py (rev 0)
+++ Sandbox/J1m/zc.zeo/src/zc/zeo/__init__.py 2008-01-01 17:47:22 UTC (rev 82631)
@@ -0,0 +1,14 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
Property changes on: Sandbox/J1m/zc.zeo/src/zc/zeo/__init__.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: Sandbox/J1m/zc.zeo/src/zc/zeo/interfaces.py
===================================================================
--- Sandbox/J1m/zc.zeo/src/zc/zeo/interfaces.py (rev 0)
+++ Sandbox/J1m/zc.zeo/src/zc/zeo/interfaces.py 2008-01-01 17:47:22 UTC (rev 82631)
@@ -0,0 +1,106 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import zope.interface
+
+class IServerProtocolHandler(zope.interface.Interface):
+
+ def __call__(client_protocol):
+ """Return an IServerHandler for the given conection and protocol.
+ """
+
+class IClientProtocolHandler(zope.interface.Interface):
+
+ def __call__(server_protocol):
+ """Return an IClientHandler for the given conection and protocol.
+
+ The handler.protocol must be less than or equal to the server protocol.
+ """
+
+class IBaseHandler(zope.interface.Interface):
+
+ protocol = zope.interface.Attribute("A string giving the handler protocol")
+
+ def handle_connection(connection):
+ """Begin communication
+
+ This is called once after the connection has been established.
+ """
+
+ def handle_one_way(connection, method, *arguments):
+ """Handle a one-way call
+ """
+
+ def handle_close(connection, reason):
+ """Handle a connection close
+ """
+
+IClientHandler = IBaseHandler
+
+class IServerHandler(IBaseHandler):
+
+ def handle_call(server_connection, message_id, method, *arguments):
+ """Handle a normal (2-way) call
+
+ The return value is ignored. The handler must later call
+ result or exception on the server_connection.
+ """
+
+class IBaseConnection(zope.interface.Interface):
+
+ def one_way(method, *arguments):
+ """Send a one-way message
+ """
+
+ def close():
+ """Close the connection
+ """
+
+class IClientConnection(IBaseConnection):
+
+ def call(method, *arguments):
+ """Call a method on the server, waiting for the result.
+
+ This is a convenience method that returns the result of
+ calling the wait method with the result of calling the request
+ method.
+
+ The separate request and wait methods are mainly useful for
+ testing, although they allow multiple calls to be made
+ simultaniously.
+ """
+
+ def request(method, *arguments):
+ """Start calling a remote method
+
+ A message id is returned. This can be passed to the wait
+ method to wait for a result.
+ """
+
+ def wait(message_id):
+ """Wait for and return a request result.
+ """
+
+class IServerConnection(IBaseConnection):
+
+ def result(message_id, value):
+ """Return the result of a previous call.
+ """
+
+ def exception(message_id, value):
+ """Return an exception value
+ """
+
+
+
Property changes on: Sandbox/J1m/zc.zeo/src/zc/zeo/interfaces.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: Sandbox/J1m/zc.zeo/src/zc/zeo/tests.py
===================================================================
--- Sandbox/J1m/zc.zeo/src/zc/zeo/tests.py (rev 0)
+++ Sandbox/J1m/zc.zeo/src/zc/zeo/tests.py 2008-01-01 17:47:22 UTC (rev 82631)
@@ -0,0 +1,21 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import unittest
+from zope.testing import doctest
+
+def test_suite():
+ return unittest.TestSuite((
+ doctest.DocFileSuite('zrpc.txt'),
+ ))
+
Property changes on: Sandbox/J1m/zc.zeo/src/zc/zeo/tests.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.py
===================================================================
--- Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.py (rev 0)
+++ Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.py 2008-01-01 17:47:22 UTC (rev 82631)
@@ -0,0 +1,208 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import cPickle
+import cStringIO
+import logging
+import sys
+import threading
+
+import zc.ngi.adapters
+import zope.interface
+
+logger = logging.getLogger(__name__)
+
+class RPCError(Exception):
+ """An error in the RPC protocol
+ """
+
+class BadGlobal(RPCError):
+ """No such module
+ """
+
+class Closed(RPCError):
+ """An attempt was made to send or recieve data on a closed connection.
+ """
+
+class _BaseConnection:
+
+ def __init__(self, connection, protocol_handler):
+ connection = zc.ngi.adapters.Sized(connection)
+ self._connection = connection
+ self._protocol_handler = protocol_handler
+ connection.setHandler(self)
+
+ def handle_input(self, connection, data):
+ try:
+ handler = self._handler
+ except AttributeError:
+ self._handle_protocol(data)
+ self._handler.handle_connection(self)
+ return
+
+ unpickler = cPickle.Unpickler(cStringIO.StringIO(data))
+ unpickler.find_global = self._find_global
+ message_id, one_way, method_name, arguments = unpickler.load()
+ if one_way:
+ handler.handle_one_way(self, method_name, *arguments)
+ else:
+ self._handle_message(message_id, method_name, arguments)
+
+ _closed = False
+ def close(self):
+ self._closed = True
+ self._connection.close()
+
+ def handle_close(self, connection, reason):
+ self._closed = True
+ try:
+ handler = self._handler
+ except AttributeError:
+ pass
+ else:
+ handler.handle_close(self, reason)
+
+ def one_way(self, method, *arguments):
+ if self._closed:
+ raise Closed
+ self._connection.write(cPickle.dumps((0, 1, method, arguments), 1))
+
+class ServerConnection(_BaseConnection):
+ _find_global = None
+
+ def __init__(self, connection, protocol_handler, protocol=None):
+ if protocol is None:
+ protocol = protocol_handler.protocol
+ _BaseConnection.__init__(self, connection, protocol_handler)
+ self._connection.write(protocol)
+
+ def _handle_protocol(self, protocol):
+ self._handler = self._protocol_handler(protocol)
+
+ def _handle_message(self, message_id, meth, arguments):
+ try:
+ self._handler.handle_call(self, message_id, meth, arguments)
+ except:
+ self.exception(message_id, sys.exc_info()[1])
+
+ def result(self, message_id, value):
+ if self._closed:
+ raise Closed
+ self._connection.write(
+ cPickle.dumps((message_id, 0, '.reply', value), 1)
+ )
+
+ def exception(self, message_id, value):
+ if self._closed:
+ raise Closed
+ value = value.__class__, value
+ self._connection.write(
+ cPickle.dumps((message_id, 0, '.reply', value), 1)
+ )
+
+class ClientConnection(_BaseConnection):
+
+ def __init__(self, connection, protocol_handler):
+ _BaseConnection.__init__(self, connection, protocol_handler)
+ self._replies = {}
+ self._cond = threading.Condition()
+ self._message_id_lock = threading.Lock()
+
+ def _handle_protocol(self, protocol):
+ self._handler = self._protocol_handler(protocol)
+ self._connection.write(self._handler.protocol)
+
+ def _handle_message(self, message_id, meth, arguments):
+ if meth != '.reply':
+ logger.critical("Unexpected cliet call")
+ self.close()
+ self._handler.handle_close("Bad Server")
+ else:
+ cond = self._cond
+ cond.acquire()
+ try:
+ self._replies[message_id] = arguments
+ cond.notifyAll()
+ finally:
+ cond.release()
+
+ def _next_message_id(self):
+ self._lock.acquire()
+ try:
+ self._message_id = message_id = self._message_id + 1
+ return message_id
+ finally:
+ self._lock.release()
+
+ _message_id = 0
+ def request(self, method, *arguments):
+ if self._closed:
+ raise Closed
+ self._message_id_lock.acquire()
+ try:
+ self._message_id = message_id = self._message_id + 1
+ finally:
+ self._message_id_lock.release()
+ self._connection.write(
+ cPickle.dumps((message_id, 0, method, arguments))
+ )
+ return message_id
+
+ def wait(self, message_id):
+ if self._closed:
+ raise Closed
+ cond = self._cond
+ replies = self._replies
+ cond.acquire()
+ try:
+ while 1:
+ try:
+ if self._closed:
+ raise Closed
+ result = replies.pop(message_id)
+ except KeyError:
+ pass
+ else:
+ break
+ cond.wait()
+ finally:
+ cond.release()
+
+ if (isinstance(result, tuple)
+ and len(result) == 2
+ and isinstance(result[1], Exception)):
+ raise result[1]
+ return result
+
+ def call(self, meth, *args):
+ return self.wait(self.request(meth, *args))
+
+ def _find_global(self, module_name, name):
+ # Find a global, which must be an exception subclass
+
+ try:
+ module = sys.modules[module_name]
+ except KeyError:
+ raise BadGlobal(module_name, name)
+
+ try:
+ r = getattr(module, name)
+ except AttributeError:
+ raise BadGlobal(module_name, name)
+
+ if issubclass(r, Exception):
+ return r
+
+ raise BadGlobal(module_name, name)
+
Property changes on: Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.txt
===================================================================
--- Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.txt (rev 0)
+++ Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.txt 2008-01-01 17:47:22 UTC (rev 82631)
@@ -0,0 +1,230 @@
+ZEO Remote Procedute Call
+=========================
+
+The zc.zeo.zrpc package provides a very very simple RPC mechanism to
+support Zope Enterprise Objects, the client-server facility for ZODB.
+
+At the lowest level, the protocol consists of sized string
+messages. Each message begins with a 4-byte big-endian message size
+and is followed by the message data.
+
+The protocol begins with protocol negotiation. The client and
+server each send a protocol identifier. The client waits for the
+server protocol. Typically, the client sends the lesser of the client and
+server protocol. (This implies that protocols are ordered.) If the
+server doesn't support the protocol sent by the client, then it closes
+the connection with the client.
+
+On top of the low-level size-message protocol is the rpc
+protocol. Each message is a pickle of a 4-tuple:
+
+message id
+ Sequential integer message identifier, unique to the client, for
+ two-way calls.
+
+flags
+ This is either 0 for two-way calls or 1 for one-way calls.
+
+method name
+ For replies, this is the special string, ".reply".
+
+arguments
+ The method arguments.
+
+(This structure is a bit silly, but is retained for compatibility with
+old clients.)
+
+The API for the RPC mechanism is fairly low level. The API builds on
+zc.ngi. There are a pair of very simple connection adapters,
+zc.zeo.zrpc.ServerConnection and zc.zeo.zrpc.ClientConnection. The
+differences between these are very minor:
+
+- ServerConnections can only send one-way messages, while
+ ClientConnections can send both one-way and 2-way messages.
+
+- ServerConnections send their protocol immediately.
+ ClientConnections wait to read the protocol sent by the server and
+ then send the lesser of their protocol and the servers.
+
+To implement a server, we provide a server connection handler that
+calls the ServerConnection constructor. We will pass this an ngi
+connection, a protocol handler and a server protocol. We can omit the
+server protocol if the protocol handler has a protocol attribute.
+
+We need to create aserver protocol handler and a server handler.
+We'll provide both as a simple class:
+
+ >>> class Server:
+ ... protocol = 'p42'
+ ...
+ ... def __init__(self, protocol):
+ ... print protocol
+ ... self.protocol = protocol
+ ... global server_handler
+ ... server_handler = self
+ ...
+ ... def handle_connection(self, connection):
+ ... self.connection = connection
+ ... connection.one_way('hello', 'client')
+ ...
+ ... def handle_one_way(self, connection, method, *arguments):
+ ... print 'Server handle_one_way', method, arguments
+ ... if method == 'fail':
+ ... raise ValueError(*arguments)
+ ...
+ ... def handle_call(self, connection, message_id, method, *arguments):
+ ... print 'Server handle_call', message_id, method, arguments
+ ... if method == 'fail':
+ ... raise ValueError(*arguments)
+ ...
+ ... def handle_close(self, connection, reason):
+ ... print 'Server closed', reason
+
+This is a very trivial class that mostly just prints what it is
+passed. It does send a one-way hello message when it is connected.
+Because this class can "handle" all protocols, we use the class itself
+as the protocol handler. It also saves the handler as a global
+variable so we can access it. We set the server protocol as the
+protocol attribute of the class so we don't have to pass it to the
+ServerConnection constructor.
+
+We create an event object that we set whenever we get a message. This
+is purely an aid for this doctest. Normal servers wouldn't do this.
+
+Now, we'll create a testing listener:
+
+ >>> import zc.ngi.testing, zc.zeo.zrpc
+ >>> listener = zc.ngi.testing.listener(
+ ... lambda connection:
+ ... zc.zeo.zrpc.ServerConnection(connection, Server)
+ ... )
+
+We passed the listener a callable that takes a connection and calles
+the ServerConnection constructor.
+
+Defining a client is fairly similar to defining a server:
+
+ >>> class Client:
+ ... def __init__(self, protocol):
+ ... print protocol
+ ... self.protocol = protocol
+ ... global client_handler
+ ... client_handler = self
+ ...
+ ... def handle_connection(self, connection):
+ ... self.connection = connection
+ ... connection.one_way('hello', 'server')
+ ...
+ ... def handle_one_way(self, connection, method, *arguments):
+ ... print 'Client handle_one_way', method, arguments
+ ...
+ ... def handle_close(self, connection, reason):
+ ... print 'Client closed', reason
+
+We need an ngi client connection handler that accepts an ngi
+connection and applies the ClientConnection adapter.
+
+ >>> class ConnectionHandler:
+ ... def __init__(self, addr, connector):
+ ... connector(addr, self)
+ ... def connected(self, connection):
+ ... self.connection = zc.zeo.zrpc.ClientConnection(
+ ... connection, Client)
+ ... def failed_connect(self, reason):
+ ... print 'connection failed', reason
+
+Now we can try creating a connection:
+
+ >>> _ = ConnectionHandler('', listener.connector)
+ p42
+ p42
+ Server handle_one_way hello ('server',)
+ Client handle_one_way hello ('client',)
+
+We saw one-way calls get sent. Let's try making a regular
+call. Normally, we'd do this with the call method, but we want to
+split the call activity into requesting and waiting for a reply so we
+can verify the request that was sent and provide a response directly:
+
+Now, let's call the server:
+
+ >>> message_id = client_handler.connection.request(
+ ... 'meth', 'arg1', 'arg2', 3)
+ Server handle_call 1 meth (('arg1', 'arg2', 3),)
+
+ >>> message_id
+ 1
+
+Here we see the resulting method and arguments get passed to the
+server.
+
+Now, we'll send a reply:
+
+ >>> server_handler.connection.result(1, 'a result')
+ >>> client_handler.connection.wait(message_id)
+ 'a result'
+
+Only basic objects (objects with built-in support in the pickle
+module) can be included as arguments or as results. Exception
+instances may be returned from server calls as exceptions.
+
+ >>> class C:
+ ... pass
+
+ >>> client_handler.connection.request('meth', C())
+ ... # doctest: +ELLIPSIS
+ Error test connection calling connection handler:
+ Traceback (most recent call last):
+ ...
+ UnpicklingError: Global and instance pickles are not supported.
+ Client closed closed
+ Server closed handle_input error
+ 2
+
+When we try to get a response, we'll get an error indicating that the
+connection was closed:
+
+ >>> client_handler.connection.wait(message_id)
+ Traceback (most recent call last):
+ ...
+ Closed
+
+
+The pickle problem is detected on the server when it tries to unpickle
+the message. We see that the connection was closed due to the server
+error. We need to reconnect:
+
+ >>> _ = ConnectionHandler('', listener.connector)
+ p42
+ p42
+ Server handle_one_way hello ('server',)
+ Client handle_one_way hello ('client',)
+
+Server application exceptions will be raised on the client:
+
+ >>> message_id = client_handler.connection.request('meth', 1, 2)
+ Server handle_call 1 meth ((1, 2),)
+
+ >>> message_id
+ 1
+
+ >>> server_handler.connection.exception(message_id, ValueError('eek'))
+ >>> client_handler.connection.wait(message_id)
+ Traceback (most recent call last):
+ ...
+ ValueError: eek
+
+
+ >>> message_id = client_handler.connection.request('fail', 1, 2)
+ Server handle_call 2 fail ((1, 2),)
+
+ >>> message_id
+ 2
+
+ >>> client_handler.connection.wait(message_id)
+ Traceback (most recent call last):
+ ...
+ ValueError: (1, 2)
+
+ >>> client_handler.connection.close()
+ Server closed closed
Property changes on: Sandbox/J1m/zc.zeo/src/zc/zeo/zrpc.txt
___________________________________________________________________
Name: svn:eol-style
+ native
More information about the Checkins
mailing list