[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py Refined queuing logic.

Jim Fulton jim at zope.com
Tue May 15 16:34:34 EDT 2007


Log message for revision 75780:
  Refined queuing logic.
  

Changed:
  U   ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py

-=-
Modified: ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py	2007-05-15 20:20:47 UTC (rev 75779)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py	2007-05-15 20:34:34 UTC (rev 75780)
@@ -245,8 +245,26 @@
     def handle_write(self):
         output = self.__output
         messages = self.__output_messages
+        while output or messages:
 
-        while output or messages:
+            # Process queued messages until we have enough output
+            size = sum((len(s) for s in output))
+            while (size <= SEND_SIZE) and messages:
+                message = messages[0]
+                if message.__class__ is str:
+                    size += self.__message_output(messages.pop(0), output)
+                elif message is _close_marker:
+                    del messages[:]
+                    del output[:]
+                    return self.close()
+                else:
+                    try:
+                        message = message.next()
+                    except StopIteration:
+                        messages.pop(0)
+                    else:
+                        size += self.__message_output(message, output)
+
             # Accumulate output into a single string so that we avoid
             # multiple send() calls, but avoid accumulating too much
             # data.  If we send a very small string and have more data
@@ -254,28 +272,9 @@
             # unfortunate interaction between the Nagle algorithm and
             # delayed acks.  If we send a very large string, only a
             # portion of it will actually be delivered at a time.
-
-            while messages:
-                message = messages.pop(0)
-                if message.__class__ is str:
-                    self.__message_output(message)
-                elif message is _close_marker:
-                    output.append(message)
-                else:
-                    for m in message:
-                        if m:
-                            self.__message_output(m)
-
-
             l = 0
             for i in range(len(output)):
-                try:
-                    l += len(output[i])
-                except TypeError:
-                    # We had an output marker, close the connection
-                    assert output[i] is _close_marker
-                    return self.close()
-
+                l += len(output[i])
                 if l > SEND_SIZE:
                     break
 
@@ -290,7 +289,8 @@
                 if err[0] in expected_socket_write_errors:
                     break # we couldn't write anything
                 raise
-            if n < len(v):
+            
+            if n < l:
                 output.insert(0, v[n:])
                 break # we can't write any more
 
@@ -303,20 +303,25 @@
                 "This action is temporarily unavailable.<p>")
         self.__output_messages.append(message)
 
-    def __message_output(self, message):
+    def __message_output(self, message, output):
         # do two separate appends to avoid copying the message string
+        size = 4        
         if self.__hmac_send:
-            self.__output.append(struct.pack(">I", len(message) | MAC_BIT))
+            output.append(struct.pack(">I", len(message) | MAC_BIT))
             self.__hmac_send.update(message)
-            self.__output.append(self.__hmac_send.digest())
+            output.append(self.__hmac_send.digest())
+            size += 20
         else:
-            self.__output.append(struct.pack(">I", len(message)))
+            output.append(struct.pack(">I", len(message)))
+
         if len(message) <= SEND_SIZE:
-            self.__output.append(message)
+            output.append(message)
         else:
             for i in range(0, len(message), SEND_SIZE):
-                self.__output.append(message[i:i+SEND_SIZE])
+                output.append(message[i:i+SEND_SIZE])
 
+        return size + len(message)
+
     def close(self):
         if not self.__closed:
             self.__closed = True



More information about the Zodb-checkins mailing list