[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py *** empty log message ***
Jim Fulton
jim at zope.com
Mon Sep 28 13:03:45 EDT 2009
Log message for revision 104590:
*** empty log message ***
Changed:
U zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py
-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py 2009-09-28 13:27:58 UTC (rev 104589)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py 2009-09-28 17:03:45 UTC (rev 104590)
@@ -53,8 +53,11 @@
import sys
import threading
import time
+import traceback
+import transaction
import zc.ngi.async
import zc.ngi.adapters
+import ZEO.ClientStorage
import ZODB.blob
import ZODB.FileStorage
import ZODB.POSException
@@ -106,6 +109,29 @@
marshal.dump(('c',), out)
out.close()
+def mergelogs(l1, l2, out):
+ files = open(l1, 'rb'), open(l2, 'rb')
+ records = [marshal.load(f) for f in files]
+ fo = open(out, 'wb')
+ while 1:
+ if records[0][1] <= records[1][1]:
+ index = 0
+ else:
+ index = 1
+ marshal.dump(records[index], fo)
+ try:
+ records[index] = marshal.load(files[index])
+ except EOFError:
+ break
+
+ index = not index
+ while 1:
+ marshal.dump(records[index], fo)
+ try:
+ records[index] = marshal.load(files[index])
+ except EOFError:
+ break
+
class Log(object):
def __init__(self, fname):
@@ -118,7 +144,7 @@
def end(self):
if self._end is not None:
return self._end
-
+
end = None
for x in self:
end = x[1]
@@ -138,76 +164,175 @@
yield session, timetime, msgid, async, op, args
+class Transactions(object):
+
+
+ def __init__(self, fname):
+ self.fname = fname
+
+ def __iter__(self):
+ f = open(self.fname)
+ while 1:
+ try:
+ yield Transaction(f, *marshal.load(f))
+ except EOFError:
+ break
+
+
+class Transaction:
+
+ def __init__(self, f, type_, tid, status, user, description, extension):
+ assert type_ == 't'
+ self.f = f
+ self.id = tid
+ self.status = status
+ self.user = user
+ self.description = description
+ self._extension = extension
+
+ used = False
+ def __iter__(self):
+ if self.used:
+ return
+ self.used = True
+ while 1:
+ data = marshal.load(self.f)
+ if data[0] == 'c':
+ break
+ yield data[1:]
+
class Handler:
- protocol = None
msgid = 0
+ closed = 0
+ queueing = True
- def __init__(self):
+ def __init__(self, session, addr, handlers):
+ self.handlers = handlers
+ self.session = session
+ self.addr = addr
self.event = threading.Event()
self.queue = []
- self.message = {}
- self.errtimes = []
- self.times = []
+ self.messages = {}
+ self.times = handlers.times
+ self.errtimes = handlers.errtimes
+ self.lock = threading.Lock()
+ zc.ngi.async.connect(addr, self)
def connected(self, connection):
+ self.handlers.connected += 1
+ self.protocol = None
self.connection = zc.ngi.adapters.Sized(connection)
- connection.set_handler(self)
- self.event.set()
+ self.connection.setHandler(self)
def failed_connect(self, reason):
- print 'WTF failed connect', reason
+ print time.ctime(), self.session, 'WTF failed connect', reason
- def handle_close(self, reason):
- print 'WTF Closed', reason
+ def handle_close(self, connection, reason):
+ with self.lock:
+ self.queueing = True
- def handle_input(self, message):
+ self.connection = None
+ print time.ctime(), self.session, 'WTF Closed', reason
+ messages = sorted((v[2], v[0], v[1]) for v in self.messages.values())
+ print time.ctime(), self.session, [v[:2] for v in messages]
+ redo = [(0, v[1], v[2]) for v in messages
+ if v[1] in ('sendBlob', 'loadEx')]
+ self.queue.extend(redo)
+ self.handlers.abandoned += len(messages) - len(redo)
+ self.messages.clear()
+ zc.ngi.async.connect(self.addr, self)
+ self.handlers.connected -= 1
+
+ def stop_queueing(self):
+ assert self.queueing
+ with self.lock:
+ self.queueing = False
+ queue = self.queue
+ while queue and not self.queueing:
+ self._call(*queue.pop(0))
+
+ def handle_input(self, connection, message):
if self.protocol is None:
self.protocol = message
- self.connection.write(self.protocol) # Echo it back
- self.call(0, 'register', ('1', 0))
+ connection.write(self.protocol) # Echo it back
+ self._call(1, 'set_log_label_from_client',
+ ('handler:%s' % self.session, ))
+ self._call(0, 'register', ('1', 0))
+ self.stop_queueing()
self.event.set()
return
- now = time.time()
- msgid, flags, op, args = cPickle.loads(message)
+ self.handlers.inputs += 1
+
+ try:
+ msgid, flags, op, args = cPickle.loads(message)
+ except:
+ print time.ctime(), self.session, 'bad message', repr(message)
+ traceback.print_exception(*sys.exc_info())
+ return
+
+ print ' got', self.session, msgid, flags, op
if (op == '.reply'):
ret = args
- op, args, start, blob = self.messages.pop(msgid)
+ op, args, start = self.messages.pop(msgid)
+ elapsed = time.time()-start
+ print ' reply', op, [
+ (v[0], v[2]) for v in self.messages.values()
+ ], elapsed
+ self.handlers.replies += 1
if (isinstance(ret, tuple)
and len(ret) == 2
and isinstance(ret[1], Exception)
):
- err = ret[1]
- self.errtimes.append(now-start)
- if isinstance(err, ZODB.POSException.POSKeyError):
-# if op == 'sendBlob':
-# # Hm. May be due to a bad serial.
-# # queue a
- self.queue.append((op, args))
- return
- print 'OOPS', ret[0].__name__, ret[1]
- # Maybe we should retry messages that generate errors.
+ n, t = self.errtimes.get(op, zz)
+ self.errtimes[op] = n+1, t+elapsed
+ print ' OOPS', op, args, elapsed, ret[0].__name__, ret[1]
+ self.handlers.errors += 1
else:
- self.times.append(now-start)
- if blob:
- assert op == 'loadEx'
- oid = args[0]
- tid = ret[1]
- self.queue.append('sendBlob', (oid, tid))
- else:
- if op == 'receiveBlobStop':
- key = args
- start = self.messages.pop(key)
- self.times.append(now-start)
+ n, t = self.times.get(op, zz)
+ self.times[op] = n+1, t+elapsed
- def call(self, async, op, args):
+ if op == 'vote':
+ self.stop_queueing()
+
+ def call(self, *args):
+ with self.lock:
+ self._call(*args)
+
+ def _call(self, async, op, args, processing_queue=False):
+ if self.queueing:
+ self.queue.append((async, op, args))
+ return
+
+ if op == 'vote':
+ print self.session, 'vote'
+ self.voting = self.queueing = True
+ if not processing_queue:
+ self.queue.insert(0, (0, 'tpc_abort', args))
+ else:
+ assert self.queue[0][1] == 'tpc_abort'
+
self.msgid += 1
+ print ' call', self.session, self.msgid, async, op
if not async:
- self.messages[msgid] = op, args, time.time(), 0
+ #print ' prev out', self.session, [
+ # (v[0], v[2]) for v in self.messages.values()]
+ self.messages[self.msgid] = op, args, time.time()
- self.connection.write(cPickle.dumps((msgid, async, op, args)))
+ self.connection.write(cPickle.dumps((self.msgid, async, op, args)))
+zz = 0, 0
+
+class Handlers:
+
+ connected = async = calls = inputs = replies = errors = pending = 0
+ abandoned = 0
+
+ def __init__(self):
+ self.errtimes = {}
+ self.times = {}
+
def parse_addr(addr):
addr = addr.split(':')
return addr[0], int(addr[1])
@@ -225,8 +350,8 @@
source
- A file-storage file that contains just the transaction
- corresponding to the input log
+ A marshalled extract of file-storage records for the period of the
+ input log.
"""
if args is None:
@@ -235,13 +360,18 @@
[addr, log, source] = args
addr = parse_addr(addr)
+ log = Log(log)
+
+ handlers = Handlers()
+
# Set up the client connections
sessions = {}
- for session, timetime, msgid, async, op, args in Log(log):
+ nhandlers = 0
+ for session, timetime, msgid, async, op, args in log:
if session not in sessions:
- handler = Handler()
+ handler = Handler(nhandlers, addr, handlers)
sessions[session] = handler
- zc.ngi.async.connect(addr, handler)
+ nhandlers += 1
for handler in sessions.values():
handler.event.wait(10)
@@ -249,7 +379,66 @@
raise ValueError("Couldn't connect.")
# Now, we're ready to replay.
- for session, timetime, msgid, async, op, args in Log(log):
- if op in ('getAuthProtocol', 'register'):
- continue
- sessions[session].call(async, op, args)
+
+ cs = ZEO.ClientStorage.ClientStorage(addr)
+ logiter = iter(log)
+ logrecord = logiter.next()
+ nt = nr = ni = 0
+ start = lastnow = time.time()
+ firsttt = lasttt = log.start()
+ work = lastwork = 0
+ speed = speed1 = None
+ for t in Transactions(source):
+ nt += 1
+ tt = ZODB.TimeStamp.TimeStamp(t.id).timeTime()
+ pending = handlers.calls - handlers.replies - handlers.abandoned
+ now = time.time()
+ if now > start:
+ speed = (tt-firsttt) / (now-start)
+ if now > lastnow:
+ speed1 = (tt-lasttt) / (now-lastnow)
+ lastnow = now
+ lasttt = tt
+ work = nt + nr + handlers.calls + handlers.async
+ print '=== top', time.ctime(), ZODB.TimeStamp.TimeStamp(time_stamp(tt))
+ print ' ', handlers.connected, handlers.calls, handlers.replies,
+ print handlers.errors, handlers.async, pending, speed, speed1
+ while logrecord[1] < tt:
+ ni += 1
+ session, _, _, async, op, args = logrecord
+ logrecord = logiter.next()
+ if op in ('getAuthProtocol', 'register', 'tpc_finish'):
+ continue
+ sessions[session].call(async, op, args)
+ if async:
+ handlers.async += 1
+ else:
+ handlers.calls += 1
+ #print op, args
+ if op == 'vote':
+ handlers.async += 1
+
+ #print '=== begin'
+ cs.tpc_begin(t, t.id, t.status)
+ for oid, serial, data in t:
+ if not data:
+ continue
+ nr += 1
+ cs.restore(oid, serial, data, '', None, t)
+ #print '=== vote'
+ cs.tpc_vote(t)
+ #print '=== finish'
+ cs.tpc_finish(t)
+
+ print '='*70
+
+ print speed, work
+
+ for op in sorted(handlers.errtimes):
+ n, t = handlers.times[op]
+ print 'err', op, n, t/n
+
+ for op in sorted(handlers.times):
+ n, t = handlers.times[op]
+ print op, n, t/n
+
More information about the checkins
mailing list