[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py *** empty log message ***

Jim Fulton jim at zope.com
Mon Sep 28 13:03:45 EDT 2009


Log message for revision 104590:
  *** empty log message ***

Changed:
  U   zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py

-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py	2009-09-28 13:27:58 UTC (rev 104589)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py	2009-09-28 17:03:45 UTC (rev 104590)
@@ -53,8 +53,11 @@
 import sys
 import threading
 import time
+import traceback
+import transaction
 import zc.ngi.async
 import zc.ngi.adapters
+import ZEO.ClientStorage
 import ZODB.blob
 import ZODB.FileStorage
 import ZODB.POSException
@@ -106,6 +109,29 @@
         marshal.dump(('c',), out)
     out.close()
 
+def mergelogs(l1, l2, out):
+    files = open(l1, 'rb'), open(l2, 'rb')
+    records = [marshal.load(f) for f in files]
+    fo = open(out, 'wb')
+    while 1:
+        if records[0][1] <= records[1][1]:
+            index = 0
+        else:
+            index = 1
+        marshal.dump(records[index], fo)
+        try:
+            records[index] = marshal.load(files[index])
+        except EOFError:
+            break
+
+    index = not index
+    while 1:
+        marshal.dump(records[index], fo)
+        try:
+            records[index] = marshal.load(files[index])
+        except EOFError:
+            break
+
 class Log(object):
 
     def __init__(self, fname):
@@ -118,7 +144,7 @@
     def end(self):
         if self._end is not None:
             return self._end
-        
+
         end = None
         for x in self:
             end = x[1]
@@ -138,76 +164,175 @@
 
             yield session, timetime, msgid, async, op, args
 
+class Transactions(object):
+
+
+    def __init__(self, fname):
+        self.fname = fname
+
+    def __iter__(self):
+        f = open(self.fname)
+        while 1:
+            try:
+                yield Transaction(f, *marshal.load(f))
+            except EOFError:
+                break
+
+
+class Transaction:
+
+    def __init__(self, f, type_, tid, status, user, description, extension):
+        assert type_ == 't'
+        self.f = f
+        self.id = tid
+        self.status = status
+        self.user = user
+        self.description = description
+        self._extension = extension
+
+    used = False
+    def __iter__(self):
+        if self.used:
+            return
+        self.used = True
+        while 1:
+            data = marshal.load(self.f)
+            if data[0] == 'c':
+                break
+            yield data[1:]
+
 class Handler:
 
-    protocol = None
     msgid = 0
+    closed = 0
+    queueing = True
 
-    def __init__(self):
+    def __init__(self, session, addr, handlers):
+        self.handlers = handlers
+        self.session = session
+        self.addr = addr
         self.event = threading.Event()
         self.queue = []
-        self.message = {}
-        self.errtimes = []
-        self.times = []
+        self.messages = {}
+        self.times = handlers.times
+        self.errtimes = handlers.errtimes
+        self.lock = threading.Lock()
+        zc.ngi.async.connect(addr, self)
 
     def connected(self, connection):
+        self.handlers.connected += 1
+        self.protocol = None
         self.connection = zc.ngi.adapters.Sized(connection)
-        connection.set_handler(self)
-        self.event.set()
+        self.connection.setHandler(self)
 
     def failed_connect(self, reason):
-        print 'WTF failed connect', reason
+        print time.ctime(), self.session, 'WTF failed connect', reason
 
-    def handle_close(self, reason):
-        print 'WTF Closed', reason
+    def handle_close(self, connection, reason):
+        with self.lock:
+            self.queueing = True
 
-    def handle_input(self, message):
+        self.connection = None
+        print time.ctime(), self.session, 'WTF Closed', reason
+        messages = sorted((v[2], v[0], v[1]) for v in self.messages.values())
+        print time.ctime(), self.session, [v[:2] for v in messages]
+        redo = [(0, v[1], v[2]) for v in messages
+                if v[1] in ('sendBlob', 'loadEx')]
+        self.queue.extend(redo)
+        self.handlers.abandoned += len(messages) - len(redo)
+        self.messages.clear()
+        zc.ngi.async.connect(self.addr, self)
+        self.handlers.connected -= 1
+
+    def stop_queueing(self):
+        assert self.queueing
+        with self.lock:
+            self.queueing = False
+            queue = self.queue
+            while queue and not self.queueing:
+                self._call(*queue.pop(0))
+
+    def handle_input(self, connection, message):
         if self.protocol is None:
             self.protocol = message
-            self.connection.write(self.protocol) # Echo it back
-            self.call(0, 'register', ('1', 0))
+            connection.write(self.protocol) # Echo it back
+            self._call(1, 'set_log_label_from_client',
+                       ('handler:%s' % self.session, ))
+            self._call(0, 'register', ('1', 0))
+            self.stop_queueing()
             self.event.set()
             return
 
-        now = time.time()
-        msgid, flags, op, args = cPickle.loads(message)
+        self.handlers.inputs += 1
+
+        try:
+            msgid, flags, op, args = cPickle.loads(message)
+        except:
+            print time.ctime(), self.session, 'bad message', repr(message)
+            traceback.print_exception(*sys.exc_info())
+            return
+
+        print '  got', self.session, msgid, flags, op
         if (op == '.reply'):
             ret = args
-            op, args, start, blob = self.messages.pop(msgid)
+            op, args, start = self.messages.pop(msgid)
+            elapsed = time.time()-start
+            print '    reply', op, [
+                (v[0], v[2]) for v in self.messages.values()
+                ], elapsed
+            self.handlers.replies += 1
             if (isinstance(ret, tuple)
                 and len(ret) == 2
                 and isinstance(ret[1], Exception)
                 ):
-                err = ret[1]
-                self.errtimes.append(now-start)
-                if isinstance(err, ZODB.POSException.POSKeyError):
-#                     if op == 'sendBlob':
-#                         # Hm. May be due to a bad serial.
-#                         # queue a
-                    self.queue.append((op, args))
-                    return
-                print 'OOPS', ret[0].__name__, ret[1]
-                # Maybe we should retry messages that generate errors.
+                n, t = self.errtimes.get(op, zz)
+                self.errtimes[op] = n+1, t+elapsed
+                print '  OOPS', op, args, elapsed, ret[0].__name__, ret[1]
+                self.handlers.errors += 1
             else:
-                self.times.append(now-start)
-                if blob:
-                    assert op == 'loadEx'
-                    oid = args[0]
-                    tid = ret[1]
-                    self.queue.append('sendBlob', (oid, tid))
-        else:
-            if op == 'receiveBlobStop':
-                key = args
-                start = self.messages.pop(key)
-                self.times.append(now-start)
+                n, t = self.times.get(op, zz)
+                self.times[op] = n+1, t+elapsed
 
-    def call(self, async, op, args):
+            if op == 'vote':
+                self.stop_queueing()
+
+    def call(self, *args):
+        with self.lock:
+            self._call(*args)
+
+    def _call(self, async, op, args, processing_queue=False):
+        if self.queueing:
+            self.queue.append((async, op, args))
+            return
+
+        if op == 'vote':
+            print self.session, 'vote'
+            self.voting = self.queueing = True
+            if not processing_queue:
+                self.queue.insert(0, (0, 'tpc_abort', args))
+            else:
+                assert self.queue[0][1] == 'tpc_abort'
+
         self.msgid += 1
+        print '  call', self.session, self.msgid, async, op
         if not async:
-            self.messages[msgid] = op, args, time.time(), 0
+            #print '    prev out', self.session, [
+            #    (v[0], v[2]) for v in self.messages.values()]
+            self.messages[self.msgid] = op, args, time.time()
 
-        self.connection.write(cPickle.dumps((msgid, async, op, args)))
+        self.connection.write(cPickle.dumps((self.msgid, async, op, args)))
 
+zz = 0, 0
+
+class Handlers:
+
+    connected = async = calls = inputs = replies = errors = pending = 0
+    abandoned = 0
+
+    def __init__(self):
+        self.errtimes = {}
+        self.times = {}
+
 def parse_addr(addr):
     addr = addr.split(':')
     return addr[0], int(addr[1])
@@ -225,8 +350,8 @@
 
        source
 
-          A file-storage file that contains just the transaction
-          corresponding to the input log
+          A marshalled extract of file-storage records for the period of the
+          input log.
 
     """
     if args is None:
@@ -235,13 +360,18 @@
     [addr, log, source] = args
     addr = parse_addr(addr)
 
+    log = Log(log)
+
+    handlers = Handlers()
+
     # Set up the client connections
     sessions = {}
-    for session, timetime, msgid, async, op, args in Log(log):
+    nhandlers = 0
+    for session, timetime, msgid, async, op, args in log:
         if session not in sessions:
-            handler = Handler()
+            handler = Handler(nhandlers, addr, handlers)
             sessions[session] = handler
-            zc.ngi.async.connect(addr, handler)
+            nhandlers += 1
 
     for handler in sessions.values():
         handler.event.wait(10)
@@ -249,7 +379,66 @@
             raise ValueError("Couldn't connect.")
 
     # Now, we're ready to replay.
-    for session, timetime, msgid, async, op, args in Log(log):
-        if op in ('getAuthProtocol', 'register'):
-            continue
-        sessions[session].call(async, op, args)
+
+    cs = ZEO.ClientStorage.ClientStorage(addr)
+    logiter = iter(log)
+    logrecord = logiter.next()
+    nt = nr = ni = 0
+    start = lastnow = time.time()
+    firsttt = lasttt = log.start()
+    work = lastwork = 0
+    speed = speed1 = None
+    for t in Transactions(source):
+        nt += 1
+        tt = ZODB.TimeStamp.TimeStamp(t.id).timeTime()
+        pending = handlers.calls - handlers.replies - handlers.abandoned
+        now = time.time()
+        if now > start:
+            speed = (tt-firsttt) / (now-start)
+        if now > lastnow:
+            speed1 = (tt-lasttt) / (now-lastnow)
+        lastnow = now
+        lasttt = tt
+        work = nt + nr + handlers.calls + handlers.async
+        print '=== top', time.ctime(), ZODB.TimeStamp.TimeStamp(time_stamp(tt))
+        print '       ', handlers.connected, handlers.calls, handlers.replies,
+        print handlers.errors, handlers.async, pending, speed, speed1
+        while logrecord[1] < tt:
+            ni += 1
+            session, _, _, async, op, args = logrecord
+            logrecord = logiter.next()
+            if op in ('getAuthProtocol', 'register', 'tpc_finish'):
+                continue
+            sessions[session].call(async, op, args)
+            if async:
+                handlers.async += 1
+            else:
+                handlers.calls += 1
+            #print op, args
+            if op == 'vote':
+                handlers.async += 1
+
+        #print '=== begin'
+        cs.tpc_begin(t, t.id, t.status)
+        for oid, serial, data in t:
+            if not data:
+                continue
+            nr += 1
+            cs.restore(oid, serial, data, '', None, t)
+        #print '=== vote'
+        cs.tpc_vote(t)
+        #print '=== finish'
+        cs.tpc_finish(t)
+
+    print '='*70
+
+    print speed, work
+
+    for op in sorted(handlers.errtimes):
+        n, t = handlers.times[op]
+        print 'err', op, n, t/n
+
+    for op in sorted(handlers.times):
+        n, t = handlers.times[op]
+        print op, n, t/n
+



More information about the checkins mailing list