[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py *** empty log message ***

Jim Fulton jim at zope.com
Sat Oct 3 09:53:51 EDT 2009


Log message for revision 104762:
  *** empty log message ***

Changed:
  U   zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py

-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py	2009-10-03 09:47:48 UTC (rev 104761)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py	2009-10-03 13:53:51 UTC (rev 104762)
@@ -273,17 +273,13 @@
         assert self.queueing
         with self.lock:
             self.queueing = False
-            queue = self.queue
-            while queue and not self.queueing:
-                self._call(*queue.pop(0))
+            self._process()
 
     def handle_input(self, connection, message):
         if self.protocol is None:
             self.protocol = message
             connection.write(self.protocol) # Echo it back
-            self._call(1, 'set_log_label_from_client',
-                       ('handler:%s' % self.session, ))
-            self._call(0, 'register', ('1', 0))
+            self.queue.insert(0, (0, 'register', ('1', 0)))
             self.stop_queueing()
             self.event.set()
             return
@@ -318,34 +314,30 @@
                 n, t = self.times.get(op, zz)
                 self.times[op] = n+1, t+elapsed
 
-            if op == 'vote':
-                self.stop_queueing()
+            self.stop_queueing()
 
-    def call(self, *args):
+    def call(self, async, op, args):
+        self.queue.append((async, op, args))
+        if op == 'vote':
+            self.queue.append((0, 'tpc_abort', args))
+
         with self.lock:
-            self._call(*args)
+            self._process()
 
-    def _call(self, async, op, args, processing_queue=False):
-        if self.queueing:
-            self.queue.append((async, op, args))
-            return
+    def _process(self):
+        while not self.queueing:
+            async, op, args = self.queue.pop(0)
 
-        if op == 'vote':
-            #print self.session, 'vote'
-            self.voting = self.queueing = True
-            if not processing_queue:
-                self.queue.insert(0, (0, 'tpc_abort', args))
-            else:
-                assert self.queue[0][1] == 'tpc_abort'
+            self.msgid += 1
 
-        self.msgid += 1
-        #print '  call', self.session, self.msgid, async, op
-        if not async:
-            #print '    prev out', self.session, [
-            #    (v[0], v[2]) for v in self.messages.values()]
-            self.messages[self.msgid] = op, args, time.time()
+            #print '  call', self.session, self.msgid, async, op
+            if not async:
+                #print '    prev out', self.session, [
+                #    (v[0], v[2]) for v in self.messages.values()]
+                self.messages[self.msgid] = op, args, time.time()
+                self.queueing = True
 
-        self.connection.write(cPickle.dumps((self.msgid, async, op, args)))
+            self.connection.write(cPickle.dumps((self.msgid, async, op, args)))
 
 zz = 0, 0
 
@@ -457,7 +449,7 @@
 
     print '='*70
 
-    print speed, nt*3+nr, ni
+    print speed, nt, nt*3+nr, ni
 
     for op in sorted(handlers.errtimes):
         n, t = handlers.times[op]



More information about the checkins mailing list