[Zodb-checkins] CVS: StandaloneZODB/ZEO/zrpc - connection.py:1.1.2.2

Jeremy Hylton jeremy@zope.com
Wed, 16 Jan 2002 20:40:40 -0500


Update of /cvs-repository/StandaloneZODB/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv13774/zrpc

Modified Files:
      Tag: Standby-branch
	connection.py 
Log Message:
Many small cleanups and improvements.

Get rid of Handler and set_caller() mechanism.  It was unused.

Replace _do_io() with two specialized methods, one of which is only
called by _call().  This should have been more obvious because the
behavior of _do_io() depended entirely on whether the wait kwarg with
0 or 1.

Use short_repr() in conjunction with low-level log() calls.

Extend doc string on Connection.

Simplify error/logging for return val from async call.

Add _ check to check_method() as suggested by Jim.

Extend ManagedServerConnection protocol to use notifyConnected().
This simplifies the newConnection() dance in StorageServer.



=== StandaloneZODB/ZEO/zrpc/connection.py 1.1.2.1 => 1.1.2.2 ===
 from ZEO import smac # XXX put smac in zrpc?
 from ZEO.zrpc.error import ZRPCError, DisconnectedError, DecodingError
-from ZEO.zrpc.log import log
+from ZEO.zrpc.log import log, short_repr
 from ZEO.zrpc.marshal import Marshaller
 from ZEO.zrpc.trigger import trigger
 import zLOG
@@ -15,19 +15,6 @@
 REPLY = ".reply" # message name used for replies
 ASYNC = 1
 
-# XXX get rid of this class and use hasattr()
-class Handler:
-    """Base class used to handle RPC caller discovery"""
-
-    def set_caller(self, addr):
-        self.__caller = addr
-
-    def get_caller(self):
-        return self.__caller
-
-    def clear_caller(self):
-        self.__caller = None
-
 class Delay:
     """Used to delay response to client for synchronous calls
 
@@ -44,7 +31,7 @@
         self.send_reply(self.msgid, obj)
 
 class Connection(smac.SizedMessageAsyncConnection):
-    """Dispatcher for RPC on object
+    """Dispatcher for RPC on object on both sides of socket.
 
     The connection supports synchronous calls, which expect a return,
     and asynchronous calls that do not.
@@ -55,7 +42,11 @@
     A Connection is designed for use in a multithreaded application,
     where a synchronous call must block until a response is ready.
     The current design only allows a single synchronous call to be
-    outstanding. 
+    outstanding.
+
+    A socket connection between a client and a server allows either
+    side to invoke methods on the other side.  The processes on each
+    end of the socket use a Connection object to manage communication.
     """
 
     __super_init = smac.SizedMessageAsyncConnection.__init__
@@ -63,7 +54,7 @@
     __super_writable = smac.SizedMessageAsyncConnection.writable
 
     def __init__(self, sock, addr, obj=None):
-        self.obj = obj
+        self.obj = None
         self.marshal = Marshaller()
         self.closed = 0
         self.msgid = 0
@@ -83,19 +74,12 @@
         # waiting for a response
         self.__reply_lock = threading.Lock()
         self.__reply_lock.acquire()
-        # If the object implements the Handler interface (XXX checked
-        # by isinstance), it wants to know who the caller is.
-        if isinstance(obj, Handler):
-            self.set_caller = 1
-        else:
-            self.set_caller = 0
+        self.register_object(obj)
 
     def __repr__(self):
         return "<%s %s>" % (self.__class__.__name__, self.addr)
 
     def close(self):
-        caller = sys._getframe(1).f_code.co_name
-        log("close() caller=%s" % caller)
         if self.closed:
             return
         self.closed = 1
@@ -103,6 +87,7 @@
         self.__super_close()
 
     def close_trigger(self):
+        # overridden by ManagedConnection
         if self.trigger is not None:
             self.trigger.close()
 
@@ -122,7 +107,7 @@
 
         if __debug__:
             log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
-                                              repr(args)[:40]),
+                                              short_repr(args)),
                 level=zLOG.DEBUG)
         if name == REPLY:
             self.handle_reply(msgid, flags, args)
@@ -137,28 +122,13 @@
         self.__reply_lock.release() # will fail if lock is unlocked
 
     def handle_request(self, msgid, flags, name, args):
-        if __debug__:
-            log("call %s%s on %s" % (name, repr(args)[:40], repr(self.obj)),
-                zLOG.DEBUG)
         if not self.check_method(name):
             raise ZRPCError("Invalid method name: %s on %s" % (name,
                                                                `self.obj`))
 
         meth = getattr(self.obj, name)
         try:
-            if self.set_caller:
-                self.obj.set_caller(self)
-                try:
-                    ret = meth(*args)
-                finally:
-                    self.obj.clear_caller()
-            else:
-                ret = meth(*args)
-        except (POSException.UndoError,
-                POSException.VersionCommitError), msg:
-            error = sys.exc_info()[:2]
-            log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
-            return self.return_error(msgid, flags, error[0], error[1])
+            ret = meth(*args)
         except Exception, msg:
             error = sys.exc_info()[:2]
             log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
@@ -166,12 +136,11 @@
 
         if flags & ASYNC:
             if ret is not None:
-                log("async method %s returned value %s" % (name, repr(ret)),
-                    zLOG.ERROR)
-                raise ZRPCError("async method returned value")
+                raise ZRPCError("async method %s returned value %s" %
+                                (name, repr(ret))
         else:
             if __debug__:
-                log("%s return %s" % (name, repr(ret)[:40]), zLOG.DEBUG)
+                log("%s return %s" % (name, short_repr(ret)), zLOG.DEBUG)
             if isinstance(ret, Delay):
                 ret.set_sender(msgid, self.send_reply)
             else:
@@ -182,12 +151,12 @@
         self.close()
 
     def log_error(self, msg="No error message supplied"):
-        error = sys.exc_info()
-        log(msg, zLOG.ERROR, error=error)
-        del error
+        log(msg, zLOG.ERROR, error=sys.exc_info())
 
     def check_method(self, name):
-        # XXX minimal security check should go here: Is name exported?
+        # XXX Is this sufficient "security" for now?
+        if name.startswith('_'):
+            return None
         return hasattr(self.obj, name)
 
     def send_reply(self, msgid, ret):
@@ -210,13 +179,10 @@
             err = ZRPCError("Couldn't pickle error %s" % `err_value`)
             msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
         self.message_output(msg)
-        self._do_io()
+        self._do_async_poll()
 
-    # The next two methods are used by clients to invoke methods on
-    # remote objects  
-
-    # XXX Should revise design to allow multiple outstanding
-    # synchronous calls
+    # The next two public methods (call and callAsync) are used by
+    # clients to invoke methods on remote objects
 
     def call(self, method, *args):
         self.__call_lock.acquire()
@@ -234,10 +200,12 @@
             log("send msg: %d, 0, %s, ..." % (msgid, method))
         self.message_output(self.marshal.encode(msgid, 0, method, args))
 
+        # XXX implementation of promises starts here
+
         self.__reply = None
-        # lock is currently held
-        self._do_io(wait=1)
-        # lock is held again...
+        # reply lock is currently held
+        self._do_async_loop()
+        # reply lock is held again...
         r_msgid, r_flags, r_args = self.__reply
         self.__reply_lock.acquire()
         assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
@@ -263,7 +231,10 @@
         if __debug__:
             log("send msg: %d, %d, %s, ..." % (msgid, ASYNC, method))
         self.message_output(self.marshal.encode(msgid, ASYNC, method, args))
-        self._do_io()
+        # XXX The message won't go out right away in this case.  It
+        # will wait for the asyncore loop to get control again.  Seems
+        # okay to comment our for now, but need to understand better.
+##        self._do_async_poll()
 
     # handle IO, possibly in async mode
 
@@ -274,8 +245,6 @@
         # Connections to be leaked.
 
     def set_async(self, map):
-        # XXX do we need a lock around this?  I'm not sure there is
-        # any harm to a race with _do_io().
         self.trigger = trigger()
         self.thr_async = 1
 
@@ -284,41 +253,45 @@
             return 1
         else:
             return 0
-            
-    def _do_io(self, wait=0): # XXX need better name
-        # XXX invariant? lock must be held when calling with wait==1
-        # otherwise, in non-async mode, there will be no poll
 
+    def _do_async_loop(self):
+        "Invoke asyncore mainloop and wait for reply."
         if __debug__:
-            log("_do_io(wait=%d), async=%d" % (wait, self.is_async()),
+            log("_do_async_loop() async=%d" % self.is_async(),
                 level=zLOG.DEBUG)
         if self.is_async():
             self.trigger.pull_trigger()
-            if wait:
-                self.__reply_lock.acquire()
-                # wait until reply...
-                self.__reply_lock.release()
+            self.__reply_lock.acquire()
+            # wait until reply...
         else:
-            if wait:
-                # do loop only if lock is already acquired
-                while not self.__reply_lock.acquire(0):
-                    asyncore.poll(10.0, self._map)
-                    if self.closed:
-                        raise DisconnectedError()
-                self.__reply_lock.release()
-            else:
-                asyncore.poll(0.0, self._map)
+            # Do loop only if lock is already acquired.  XXX But can't
+            # we already guarantee that the lock is already acquired?
+            while not self.__reply_lock.acquire(0):
+                asyncore.poll(10.0, self._map)
+                if self.closed:
+                    raise DisconnectedError()
+        self.__reply_lock.release()
+            
+    def _do_async_poll(self, wait_for_reply=0):
+        "Invoke asyncore mainloop to get pending message out."
 
-        # XXX it seems that we need to release before returning if
-        # called with wait==1.  perhaps the caller need not acquire
-        # upon return...
+        if __debug__:
+            log("_do_async_poll(), async=%d" % self.is_async(),
+                level=zLOG.DEBUG)
+        if self.is_async():
+            self.trigger.pull_trigger()
+        else:
+            asyncore.poll(0.0, self._map)
 
 class ServerConnection(Connection):
-    # XXX this is a hack
-    def _do_io(self, wait=0):
+
+    def _do_async_poll(self, wait=0):
         """If this is a server, there is no explicit IO to do"""
         pass
 
+    # XXX _do_async_loop is never called.  Should it be defined as
+    # above anyway?
+
 class ManagedServerConnection(ServerConnection):
     """A connection that notifies its ConnectionManager of closing"""
     __super_init = Connection.__init__
@@ -327,6 +300,7 @@
     def __init__(self, sock, addr, obj, mgr):
         self.__mgr = mgr
         self.__super_init(sock, addr, obj)
+        obj.notifyConnected(self)
 
     def close(self):
         self.__super_close()