[Checkins] SVN: ZODB/trunk/src/ZEO/ Provide shorter code path for loads, which are most common operation.

Jim Fulton jim at zope.com
Wed Apr 6 10:23:24 EDT 2011


Log message for revision 121306:
  Provide shorter code path for loads, which are most common operation.
  
  Simplified and optimized marshalling code.
  

Changed:
  U   ZODB/trunk/src/ZEO/tests/ConnectionTests.py
  U   ZODB/trunk/src/ZEO/zrpc/connection.py
  U   ZODB/trunk/src/ZEO/zrpc/marshal.py

-=-
Modified: ZODB/trunk/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/ConnectionTests.py	2011-04-06 13:50:21 UTC (rev 121305)
+++ ZODB/trunk/src/ZEO/tests/ConnectionTests.py	2011-04-06 14:23:24 UTC (rev 121306)
@@ -24,7 +24,7 @@
 import ZEO.ServerStub
 from ZEO.ClientStorage import ClientStorage
 from ZEO.Exceptions import ClientDisconnected
-from ZEO.zrpc.marshal import Marshaller
+from ZEO.zrpc.marshal import encode
 from ZEO.tests import forker
 
 from ZODB.DB import DB
@@ -475,7 +475,7 @@
         class Hack:
             pass
 
-        msg = Marshaller().encode(1, 0, "foo", (Hack(),))
+        msg = encode(1, 0, "foo", (Hack(),))
         self._bad_message(msg)
         del Hack
 

Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py	2011-04-06 13:50:21 UTC (rev 121305)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py	2011-04-06 14:23:24 UTC (rev 121306)
@@ -13,11 +13,13 @@
 ##############################################################################
 import asyncore
 import atexit
+import cPickle
 import errno
 import select
 import sys
 import threading
 import logging
+import ZEO.zrpc.marshal
 
 import traceback, time
 
@@ -25,7 +27,6 @@
 
 from ZEO.zrpc import smac
 from ZEO.zrpc.error import ZRPCError, DisconnectedError
-from ZEO.zrpc.marshal import Marshaller, ServerMarshaller
 from ZEO.zrpc.log import short_repr, log
 from ZODB.loglevels import BLATHER, TRACE
 import ZODB.POSException
@@ -287,7 +288,10 @@
     # our peer.
     def __init__(self, sock, addr, obj, tag, map=None):
         self.obj = None
-        self.marshal = Marshaller()
+        self.decode = ZEO.zrpc.marshal.decode
+        self.encode = ZEO.zrpc.marshal.encode
+        self.fast_encode = ZEO.zrpc.marshal.fast_encode
+
         self.closed = False
         self.peer_protocol_version = None # set in recv_handshake()
 
@@ -413,13 +417,34 @@
         # will raise an exception.  The exception will ultimately
         # result in asycnore calling handle_error(), which will
         # close the connection.
-        msgid, async, name, args = self.marshal.decode(message)
+        msgid, async, name, args = self.decode(message)
 
         if debug_zrpc:
             self.log("recv msg: %s, %s, %s, %s" % (msgid, async, name,
                                                    short_repr(args)),
                      level=TRACE)
-        if name == REPLY:
+
+        if name == 'loadEx':
+
+            # Special case and inline the heck out of load case:
+            try:
+                ret = self.obj.loadEx(*args)
+            except (SystemExit, KeyboardInterrupt):
+                raise
+            except Exception, msg:
+                if not isinstance(msg, self.unlogged_exception_types):
+                    self.log("%s() raised exception: %s" % (name, msg),
+                             logging.ERROR, exc_info=True)
+                self.return_error(msgid, *sys.exc_info()[:2])
+            else:
+                try:
+                    self.message_output(self.fast_encode(msgid, 0, REPLY, ret))
+                    self.poll()
+                except:
+                    # Fall back to normal version for better error handling
+                    self.send_reply(msgid, ret)
+
+        elif name == REPLY:
             assert not async
             self.handle_reply(msgid, args)
         else:
@@ -493,14 +518,14 @@
         # it's acceptable -- we really do want to catch every exception
         # cPickle may raise.
         try:
-            msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
+            msg = self.encode(msgid, 0, REPLY, (err_type, err_value))
         except: # see above
             try:
                 r = short_repr(err_value)
             except:
                 r = "<unreprable>"
             err = ZRPCError("Couldn't pickle error %.100s" % r)
-            msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
+            msg = self.encode(msgid, 0, REPLY, (ZRPCError, err))
         self.message_output(msg)
         self.poll()
 
@@ -527,7 +552,7 @@
         if debug_zrpc:
             self.log("send msg: %d, %d, %s, ..." % (msgid, async, method),
                      level=TRACE)
-        buf = self.marshal.encode(msgid, async, method, args)
+        buf = self.encode(msgid, async, method, args)
         self.message_output(buf)
         return msgid
 
@@ -560,7 +585,7 @@
         The calls will not be interleaved with other calls from the same
         client.
         """
-        self.message_output(self.marshal.encode(0, 1, method, args)
+        self.message_output(self.encode(0, 1, method, args)
                             for method, args in iterator)
 
     def handle_reply(self, msgid, ret):
@@ -573,6 +598,8 @@
         self.trigger.pull_trigger()
 
 
+# import cProfile, time
+
 class ManagedServerConnection(Connection):
     """Server-side Connection subclass."""
 
@@ -583,7 +610,9 @@
         self.mgr = mgr
         map = {}
         Connection.__init__(self, sock, addr, obj, 'S', map=map)
-        self.marshal = ServerMarshaller()
+
+        self.decode = ZEO.zrpc.marshal.server_decode
+
         self.trigger = ZEO.zrpc.trigger.trigger(map)
         self.call_from_thread = self.trigger.pull_trigger
 
@@ -591,6 +620,15 @@
         t.setDaemon(True)
         t.start()
 
+        # self.profile = cProfile.Profile()
+
+    # def message_input(self, message):
+    #     self.profile.enable()
+    #     try:
+    #         Connection.message_input(self, message)
+    #     finally:
+    #         self.profile.disable()
+
     def handshake(self):
         # Send the server's preferred protocol to the client.
         self.message_output(self.current_protocol)
@@ -602,6 +640,7 @@
     def close(self):
         self.obj.notifyDisconnected()
         Connection.close(self)
+        # self.profile.dump_stats(str(time.time())+'.stats')
 
     def send_reply(self, msgid, ret, immediately=True):
         # encode() can pass on a wide variety of exceptions from cPickle.
@@ -609,14 +648,14 @@
         # it's acceptable -- we really do want to catch every exception
         # cPickle may raise.
         try:
-            msg = self.marshal.encode(msgid, 0, REPLY, ret)
+            msg = self.encode(msgid, 0, REPLY, ret)
         except: # see above
             try:
                 r = short_repr(ret)
             except:
                 r = "<unreprable>"
             err = ZRPCError("Couldn't pickle return %.100s" % r)
-            msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
+            msg = self.encode(msgid, 0, REPLY, (ZRPCError, err))
         self.message_output(msg)
         if immediately:
             self.poll()

Modified: ZODB/trunk/src/ZEO/zrpc/marshal.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/marshal.py	2011-04-06 13:50:21 UTC (rev 121305)
+++ ZODB/trunk/src/ZEO/zrpc/marshal.py	2011-04-06 14:23:24 UTC (rev 121306)
@@ -11,61 +11,66 @@
 # FOR A PARTICULAR PURPOSE
 #
 ##############################################################################
-import cPickle
+from cPickle import Unpickler, Pickler
 from cStringIO import StringIO
 import logging
 
 from ZEO.zrpc.error import ZRPCError
 from ZEO.zrpc.log import log, short_repr
 
-class Marshaller:
-    """Marshal requests and replies to second across network"""
+def encode(*args): # args: (msgid, flags, name, args)
+    # (We used to have a global pickler, but that's not thread-safe. :-( )
 
-    def encode(self, msgid, flags, name, args):
-        """Returns an encoded message"""
-        # (We used to have a global pickler, but that's not thread-safe. :-( )
-        # Note that args may contain very large binary pickles already; for
-        # this reason, it's important to use proto 1 (or higher) pickles here
-        # too.  For a long time, this used proto 0 pickles, and that can
-        # bloat our pickle to 4x the size (due to high-bit and control bytes
-        # being represented by \xij escapes in proto 0).
-        # Undocumented:  cPickle.Pickler accepts a lone protocol argument;
-        # pickle.py does not.
-        pickler = cPickle.Pickler(1)
-        pickler.fast = 1
+    # It's not thread safe if, in the couse of pickling, we call the
+    # Python interpeter, which releases the GIL.
 
-        # Undocumented:  pickler.dump(), for a cPickle.Pickler, takes
-        # an optional boolean argument.  When true, it returns the pickle;
-        # when false or unspecified, it returns the pickler object itself.
-        # pickle.py does none of this.
-        return pickler.dump((msgid, flags, name, args), 1)
+    # Note that args may contain very large binary pickles already; for
+    # this reason, it's important to use proto 1 (or higher) pickles here
+    # too.  For a long time, this used proto 0 pickles, and that can
+    # bloat our pickle to 4x the size (due to high-bit and control bytes
+    # being represented by \xij escapes in proto 0).
+    # Undocumented:  cPickle.Pickler accepts a lone protocol argument;
+    # pickle.py does not.
+    pickler = Pickler(1)
+    pickler.fast = 1
+    return pickler.dump(args, 1)
 
-    def decode(self, msg):
-        """Decodes msg and returns its parts"""
-        unpickler = cPickle.Unpickler(StringIO(msg))
-        unpickler.find_global = find_global
 
-        try:
-            return unpickler.load() # msgid, flags, name, args
-        except:
-            log("can't decode message: %s" % short_repr(msg),
-                level=logging.ERROR)
-            raise
+ at apply
+def fast_encode():
+    # Only use in cases where you *know* the data contains only basic
+    # Python objects
+    pickler = Pickler(1)
+    pickler.fast = 1
+    dump = pickler.dump
+    def fast_encode(*args):
+        return dump(args, 1)
+    return fast_encode
 
-class ServerMarshaller(Marshaller):
+def decode(msg):
+    """Decodes msg and returns its parts"""
+    unpickler = Unpickler(StringIO(msg))
+    unpickler.find_global = find_global
 
-    def decode(self, msg):
-        """Decodes msg and returns its parts"""
-        unpickler = cPickle.Unpickler(StringIO(msg))
-        unpickler.find_global = server_find_global
+    try:
+        return unpickler.load() # msgid, flags, name, args
+    except:
+        log("can't decode message: %s" % short_repr(msg),
+            level=logging.ERROR)
+        raise
 
-        try:
-            return unpickler.load() # msgid, flags, name, args
-        except:
-            log("can't decode message: %s" % short_repr(msg),
-                level=logging.ERROR)
-            raise
+def server_decode(msg):
+    """Decodes msg and returns its parts"""
+    unpickler = Unpickler(StringIO(msg))
+    unpickler.find_global = server_find_global
 
+    try:
+        return unpickler.load() # msgid, flags, name, args
+    except:
+        log("can't decode message: %s" % short_repr(msg),
+            level=logging.ERROR)
+        raise
+
 _globals = globals()
 _silly = ('__doc__',)
 



More information about the checkins mailing list