[Checkins] SVN: ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py Distributed client- or server-specific methods to appropriate subclasses.

Jim Fulton jim at zope.com
Wed Jan 27 13:07:57 EST 2010


Log message for revision 108575:
  Distributed client- or server-specific methods to appropriate subclasses.
  

Changed:
  U   ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py

-=-
Modified: ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py	2010-01-27 18:06:18 UTC (rev 108574)
+++ ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py	2010-01-27 18:07:57 UTC (rev 108575)
@@ -189,7 +189,7 @@
 
     def error(self, exc_info):
         log("Error raised in delayed method", logging.ERROR, exc_info=True)
-        self.return_error(self.msgid, 0, *exc_info[:2])
+        self.return_error(self.msgid, *exc_info[:2])
 
 class MTDelay(Delay):
 
@@ -541,21 +541,11 @@
                                                    short_repr(args)),
                      level=TRACE)
         if name == REPLY:
-            self.handle_reply(msgid, async, args)
+            assert not async
+            self.handle_reply(msgid, args)
         else:
             self.handle_request(msgid, async, name, args)
 
-    def handle_reply(self, msgid, async, args):
-        if debug_zrpc:
-            self.log("recv reply: %s, %s, %s"
-                     % (msgid, async, short_repr(args)), level=TRACE)
-        self.replies_cond.acquire()
-        try:
-            self.replies[msgid] = args
-            self.replies_cond.notifyAll()
-        finally:
-            self.replies_cond.release()
-
     def handle_request(self, msgid, async, name, args):
         obj = self.obj
 
@@ -587,8 +577,14 @@
                 self.log("%s() raised exception: %s" % (name, msg),
                          logging.ERROR, exc_info=True)
             error = sys.exc_info()[:2]
-            return self.return_error(msgid, async, *error)
+            if async:
+                self.log("Asynchronous call raised exception: %s" % self,
+                         level=logging.ERROR, exc_info=True)
+            else:
+                self.return_error(msgid, *error)
+            return
 
+
         if async:
             if ret is not None:
                 raise ZRPCError("async method %s returned value %s" %
@@ -606,35 +602,11 @@
             self.__super_setSessionKey(self.delay_sesskey)
             self.delay_sesskey = None
 
-    def handle_error(self):
-        if sys.exc_info()[0] == SystemExit:
-            raise sys.exc_info()
-        self.log("Error caught in asyncore",
-                 level=logging.ERROR, exc_info=True)
-        self.close()
+    def return_error(self, msgid, err_type, err_value):
+        # Note that, ideally, this should be defined soley for
+        # servers, but a test arranges to get it called on
+        # a client. Too much trouble to fix it now. :/
 
-    def send_reply(self, msgid, ret):
-        # encode() can pass on a wide variety of exceptions from cPickle.
-        # While a bare `except` is generally poor practice, in this case
-        # it's acceptable -- we really do want to catch every exception
-        # cPickle may raise.
-        try:
-            msg = self.marshal.encode(msgid, 0, REPLY, ret)
-        except: # see above
-            try:
-                r = short_repr(ret)
-            except:
-                r = "<unreprable>"
-            err = ZRPCError("Couldn't pickle return %.100s" % r)
-            msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
-        self.message_output(msg)
-        self.poll()
-
-    def return_error(self, msgid, async, err_type, err_value):
-        if async:
-            self.log("Asynchronous call raised exception: %s" % self,
-                     level=logging.ERROR, exc_info=True)
-            return
         if not isinstance(err_value, Exception):
             err_value = err_type, err_value
 
@@ -654,6 +626,13 @@
         self.message_output(msg)
         self.poll()
 
+    def handle_error(self):
+        if sys.exc_info()[0] == SystemExit:
+            raise sys.exc_info()
+        self.log("Error caught in asyncore",
+                 level=logging.ERROR, exc_info=True)
+        self.close()
+
     def setSessionKey(self, key):
         if self.waiting_for_reply:
             self.delay_sesskey = key
@@ -680,7 +659,7 @@
                      level=TRACE)
         return self.marshal.encode(msgid, async, method, args)
 
-    def send_call(self, method, args, async):
+    def send_call(self, method, args, async=False):
         # send a message and return its msgid
         msgid = self.__new_msgid()
         if debug_zrpc:
@@ -690,39 +669,6 @@
         self.message_output(buf)
         return msgid
 
-    def call(self, method, *args):
-        if self.closed:
-            raise DisconnectedError()
-        msgid = self.send_call(method, args, 0)
-        r_args = self.wait(msgid)
-        if (isinstance(r_args, tuple) and len(r_args) > 1
-            and type(r_args[0]) == exception_type_type
-            and issubclass(r_args[0], Exception)):
-            inst = r_args[1]
-            raise inst # error raised by server
-        else:
-            return r_args
-
-    # For testing purposes, it is useful to begin a synchronous call
-    # but not block waiting for its response.
-
-    def _deferred_call(self, method, *args):
-        if self.closed:
-            raise DisconnectedError()
-        msgid = self.send_call(method, args, 0)
-        self.trigger.pull_trigger()
-        return msgid
-
-    def _deferred_wait(self, msgid):
-        r_args = self.wait(msgid)
-        if (isinstance(r_args, tuple)
-            and type(r_args[0]) == exception_type_type
-            and issubclass(r_args[0], Exception)):
-            inst = r_args[1]
-            raise inst # error raised by server
-        else:
-            return r_args
-
     def callAsync(self, method, *args):
         if self.closed:
             raise DisconnectedError()
@@ -750,33 +696,9 @@
             yield self.__call_message(method, args, 1)
 
 
-    def wait(self, msgid):
-        """Invoke asyncore mainloop and wait for reply."""
-        if debug_zrpc:
-            self.log("wait(%d)" % msgid, level=TRACE)
+    def handle_reply(self, msgid, ret):
+        assert msgid == -1 and ret is None
 
-        self.trigger.pull_trigger()
-
-        # Delay used when we call asyncore.poll() directly.
-        # Start with a 1 msec delay, double until 1 sec.
-        delay = 0.001
-
-        self.replies_cond.acquire()
-        try:
-            while 1:
-                if self.closed:
-                    raise DisconnectedError()
-                reply = self.replies.get(msgid, self)
-                if reply is not self:
-                    del self.replies[msgid]
-                    if debug_zrpc:
-                        self.log("wait(%d): reply=%s" %
-                                 (msgid, short_repr(reply)), level=TRACE)
-                    return reply
-                self.replies_cond.wait()
-        finally:
-            self.replies_cond.release()
-
     def flush(self):
         """Invoke poll() until the output buffer is empty."""
         if debug_zrpc:
@@ -791,7 +713,6 @@
         self.trigger.pull_trigger()
 
 
-
 class ManagedServerConnection(Connection):
     """Server-side Connection subclass."""
 
@@ -818,6 +739,23 @@
         self.obj.notifyDisconnected()
         Connection.close(self)
 
+    def send_reply(self, msgid, ret):
+        # encode() can pass on a wide variety of exceptions from cPickle.
+        # While a bare `except` is generally poor practice, in this case
+        # it's acceptable -- we really do want to catch every exception
+        # cPickle may raise.
+        try:
+            msg = self.marshal.encode(msgid, 0, REPLY, ret)
+        except: # see above
+            try:
+                r = short_repr(ret)
+            except:
+                r = "<unreprable>"
+            err = ZRPCError("Couldn't pickle return %.100s" % r)
+            msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
+        self.message_output(msg)
+        self.poll()
+
 class ManagedClientConnection(Connection):
     """Client-side Connection subclass."""
     __super_init = Connection.__init__
@@ -887,3 +825,79 @@
             self.queue_output = False
         finally:
             self.output_lock.release()
+
+    def call(self, method, *args):
+        if self.closed:
+            raise DisconnectedError()
+        msgid = self.send_call(method, args)
+        r_args = self.wait(msgid)
+        if (isinstance(r_args, tuple) and len(r_args) > 1
+            and type(r_args[0]) == exception_type_type
+            and issubclass(r_args[0], Exception)):
+            inst = r_args[1]
+            raise inst # error raised by server
+        else:
+            return r_args
+
+    def wait(self, msgid):
+        """Invoke asyncore mainloop and wait for reply."""
+        if debug_zrpc:
+            self.log("wait(%d)" % msgid, level=TRACE)
+
+        self.trigger.pull_trigger()
+
+        # Delay used when we call asyncore.poll() directly.
+        # Start with a 1 msec delay, double until 1 sec.
+        delay = 0.001
+
+        self.replies_cond.acquire()
+        try:
+            while 1:
+                if self.closed:
+                    raise DisconnectedError()
+                reply = self.replies.get(msgid, self)
+                if reply is not self:
+                    del self.replies[msgid]
+                    if debug_zrpc:
+                        self.log("wait(%d): reply=%s" %
+                                 (msgid, short_repr(reply)), level=TRACE)
+                    return reply
+                self.replies_cond.wait()
+        finally:
+            self.replies_cond.release()
+
+    # For testing purposes, it is useful to begin a synchronous call
+    # but not block waiting for its response.
+
+    def _deferred_call(self, method, *args):
+        if self.closed:
+            raise DisconnectedError()
+        msgid = self.send_call(method, args)
+        self.trigger.pull_trigger()
+        return msgid
+
+    def _deferred_wait(self, msgid):
+        r_args = self.wait(msgid)
+        if (isinstance(r_args, tuple)
+            and type(r_args[0]) == exception_type_type
+            and issubclass(r_args[0], Exception)):
+            inst = r_args[1]
+            raise inst # error raised by server
+        else:
+            return r_args
+
+    def handle_reply(self, msgid, args):
+        if debug_zrpc:
+            self.log("recv reply: %s, %s"
+                     % (msgid, short_repr(args)), level=TRACE)
+        self.replies_cond.acquire()
+        try:
+            self.replies[msgid] = args
+            self.replies_cond.notifyAll()
+        finally:
+            self.replies_cond.release()
+
+    def send_reply(self, msgid, ret):
+        # Whimper. Used to send heartbeat
+        assert msgid == -1 and ret is None
+        self.message_output('(J\xff\xff\xff\xffK\x00U\x06.replyNt.')



More information about the checkins mailing list