[Checkins] SVN: ZODB/branches/jim-zrpc/src/ZEO/ Add a call_from_thread conviencience function to connections.

Jim Fulton jim at zope.com
Wed Jan 27 13:43:14 EST 2010


Log message for revision 108577:
  Add a call_from_thread conviencience function to connections.
  
  Have MTDelay objects use call_from_thread to send results.
  
  Now, on server, all network output is from the asyncore loop.
  This means we no longer need to wake the async loop on servers when
  outputing and, if we choose, we can also immediately push data down
  the socket without waiting for the main loop to get around to it.
  

Changed:
  U   ZODB/branches/jim-zrpc/src/ZEO/tests/testZEO2.py
  U   ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py

-=-
Modified: ZODB/branches/jim-zrpc/src/ZEO/tests/testZEO2.py
===================================================================
--- ZODB/branches/jim-zrpc/src/ZEO/tests/testZEO2.py	2010-01-27 18:18:56 UTC (rev 108576)
+++ ZODB/branches/jim-zrpc/src/ZEO/tests/testZEO2.py	2010-01-27 18:43:14 UTC (rev 108577)
@@ -78,9 +78,10 @@
     >>> zs2.storeBlobEnd(oid, serial, data, '1')
     >>> delay = zs2.vote('1')
 
-    >>> def send_reply(id, reply):
-    ...     print 'reply', id, reply
-    >>> delay.set_sender(1, send_reply, None)
+    >>> class Sender:
+    ...     def send_reply(self, id, reply):
+    ...         print 'reply', id, reply
+    >>> delay.set_sender(1, Sender())
 
     >>> logger = logging.getLogger('ZEO')
     >>> handler = logging.StreamHandler(sys.stdout)

Modified: ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py	2010-01-27 18:18:56 UTC (rev 108576)
+++ ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py	2010-01-27 18:43:14 UTC (rev 108577)
@@ -179,34 +179,33 @@
     the mainloop from sending a response.
     """
 
-    def set_sender(self, msgid, send_reply, return_error):
+    def set_sender(self, msgid, conn):
         self.msgid = msgid
-        self.send_reply = send_reply
-        self.return_error = return_error
+        self.conn = conn
 
     def reply(self, obj):
-        self.send_reply(self.msgid, obj)
+        self.conn.send_reply(self.msgid, obj)
 
     def error(self, exc_info):
         log("Error raised in delayed method", logging.ERROR, exc_info=True)
-        self.return_error(self.msgid, *exc_info[:2])
+        self.conn.return_error(self.msgid, *exc_info[:2])
 
 class MTDelay(Delay):
 
     def __init__(self):
         self.ready = threading.Event()
 
-    def set_sender(self, msgid, send_reply, return_error):
-        Delay.set_sender(self, msgid, send_reply, return_error)
+    def set_sender(self, *args):
+        Delay.set_sender(self, *args)
         self.ready.set()
 
     def reply(self, obj):
         self.ready.wait()
-        Delay.reply(self, obj)
+        self.conn.call_from_thread(self.conn.send_reply, self.msgid, obj)
 
     def error(self, exc_info):
         self.ready.wait()
-        Delay.error(self, exc_info)
+        self.conn.call_from_thread(Delay.error, self, exc_info)
 
 # PROTOCOL NEGOTIATION
 #
@@ -594,7 +593,7 @@
                 self.log("%s returns %s" % (name, short_repr(ret)),
                          logging.DEBUG)
             if isinstance(ret, Delay):
-                ret.set_sender(msgid, self.send_reply, self.return_error)
+                ret.set_sender(msgid, self)
             else:
                 self.send_reply(msgid, ret)
 
@@ -707,6 +706,7 @@
 
     # Servers use a shared server trigger that uses the asyncore socket map
     trigger = trigger()
+    call_from_thread = trigger.pull_trigger
 
     def __init__(self, sock, addr, obj, mgr):
         self.mgr = mgr
@@ -749,6 +749,7 @@
     base_message_output = Connection.message_output
 
     trigger = client_trigger
+    call_from_thread = trigger.pull_trigger
 
     def __init__(self, sock, addr, mgr):
         self.mgr = mgr



More information about the checkins mailing list