[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py *** empty log message ***
Jim Fulton
jim at zope.com
Thu May 26 17:02:46 EDT 2011
Log message for revision 121826:
*** empty log message ***
Changed:
A zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
-=-
Added: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py (rev 0)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py 2011-05-26 21:02:45 UTC (rev 121826)
@@ -0,0 +1,450 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Replay ZEO input logs in read-only mode
+"""
+
+
+import cPickle
+import logging
+import marshal
+from multiprocessing import Process, Queue
+# from threading import Thread as Process
+# from Queue import Queue
+import os
+import sys
+import threading
+import time
+import traceback
+import transaction
+import zc.ngi.async
+import zc.ngi.adapters
+import ZODB.TimeStamp
+import ZODB.utils
+
+sys.setcheckinterval(999)
+
+logging.basicConfig()
+
+def time_stamp(timetime):
+ return repr(ZODB.TimeStamp.TimeStamp(
+ *time.gmtime(timetime)[:5]
+ +(time.gmtime(timetime)[5]+divmod(timetime,1)[1],)
+ ))
+
+class Log(object):
+
+ def __init__(self, fname, ops=None):
+ self.fname = fname
+ self.ops = ops
+
+ def start(self):
+ return iter(self).next()[1]
+
+ _end = None
+ def end(self):
+ if self._end is not None:
+ return self._end
+
+ end = None
+ for x in self:
+ end = x[1]
+
+ self._end = end
+ return end
+
+ def __iter__(self):
+ f = open(self.fname)
+ ops = self.ops
+ while 1:
+ try:
+ session, timetime, message = marshal.load(f)
+ except EOFError:
+ break
+
+ msgid, async, op, args = cPickle.loads(message)
+ if ops and op not in ops:
+ continue
+
+ yield session, timetime, msgid, async, op, args
+
+ def __len__(self):
+ n = 0
+ for x in self:
+ n += 1
+ return n
+
+ def sessions(self):
+ sessions = {}
+ for session, timetime, msgid, async, op, args in self:
+ stats = sessions.get(session)
+ if stats is None:
+ stats = sessions[session] = dict(
+ start_timetime=timetime,
+ ops={},
+ size=0,
+ )
+ stats['end_timetime'] = timetime
+ stats['ops'][op] = stats['ops'].get(op, 0) + 1
+ stats['size'] += 1
+
+ return sessions
+
+ def splitsessions(self, sessions, outname):
+ # Split the given sessions my splitting roughly half the calls
+ # calls into a new session containing only loadEx calls
+ sessions = set("0%s" % session for session in sessions)
+ sizes = {}
+ out = open(outname, 'wb')
+ for session, timetime, msgid, async, op, args in self:
+ session = "0%s" % session
+ if (session in sessions
+ and op == 'loadEx'
+ and sizes.get(session, 0) > sizes.get('1'+session[1:], 0)
+ ):
+ session = '1'+session[1:]
+ sizes[session] = sizes.get(session, 0) + 1
+ marshal.dump(
+ (session, timetime, cPickle.dumps((msgid, async, op, args), 1)
+ ),
+ out)
+
+ def splitsendblobs(self, sessions, outname):
+ # Split the given sessions my splitting roughly half the
+ # sendBlob calls calls into a new session.
+ sessions = set("0%s" % session for session in sessions)
+ sizes = {}
+ out = open(outname, 'wb')
+ for session, timetime, msgid, async, op, args in self:
+ session = "0%s" % session
+ if session in sessions and op == 'sendBlob':
+ if sizes.get(session, 0) > sizes.get('1'+session[1:], 0):
+ session = '1'+session[1:]
+ sizes[session] = sizes.get(session, 0) + 1
+ marshal.dump(
+ (session, timetime, cPickle.dumps((msgid, async, op, args), 1)
+ ),
+ out)
+
+class Handler:
+
+ msgid = 0
+ closed = 0
+ queueing = True
+ protocol = message = None
+
+ def __init__(self, addr, session, inq, outq):
+ self.session = session
+ self.addr = addr
+ def output(op, *args):
+ outq.put((op, args))
+ self.output = output
+ self.condition = condition = threading.Condition()
+ self.ngi = zc.ngi.async.Implementation()
+ self.ngi.connect(addr, self)
+
+ with condition:
+ while 1:
+ #print '__init__', self.protocol, self.message, inq.empty(),
+ if self.protocol and not self.message:
+ try:
+ callargs = inq.get(True, 1.0)
+ except:
+ #print 'queue timeout'
+ continue
+ if callargs == 'stop':
+ break
+ async, op, args = callargs
+ assert not async
+ self.call(op, args)
+ condition.wait(1)
+
+ def call(self, op, args):
+ assert not self.message
+ self.msgid += 1
+ #print time.ctime(), 'call', self.msgid, op, args
+ self.message = [self.msgid, op, args]
+ data = cPickle.dumps((self.msgid, 0, op, args))
+ append = self.message.append
+
+ @self.connection.writelines
+ @apply
+ def _():
+ append(time.time())
+ yield data
+
+ self.output('request', op, args)
+
+ def connected(self, connection):
+ self.output('connect')
+ self.protocol = None
+ self.connection = zc.ngi.adapters.Sized(connection)
+ self.connection.set_handler(self)
+
+ def failed_connect(self, reason):
+ print time.ctime(), self.session, 'WTF failed connect', reason
+
+ def handle_close(self, connection, reason):
+ with self.condition:
+ self.protocol = self.message = None
+
+ self.connection = None
+ print time.ctime(), self.session, 'WTF Closed', reason
+ self.output('disconnect', 0)
+ self.ngi.connect(self.addr, self)
+
+ def handle_input(self, connection, message):
+ now = time.time()
+ try:
+ msgid, flags, op, args = cPickle.loads(message)
+ except:
+ if message[0] == 'Z':
+ with self.condition:
+ if self.protocol is None:
+ connection.write(message) # Echo it back
+ self.call('register', ('1', 1))
+ self.protocol = message
+ self.condition.notifyAll()
+ return
+
+ print time.ctime(), self.session, 'bad message', repr(message)
+ traceback.print_exception(*sys.exc_info())
+ return
+
+ if op == 'invalidateTransaction':
+ return
+ #print time.ctime(), 'input', msgid, op
+ if (op == '.reply'):
+ ret = args
+ with self.condition:
+ rmsgid, op, args, start = self.message
+ assert rmsgid == msgid
+ elapsed = now-start
+ #print elapsed * 1000
+ self.message = None
+ #print 'notift reply'
+ self.condition.notifyAll()
+
+ self.output('reply', op, args, ret, elapsed)
+
+zz = 0, 0
+
+class Handlers:
+
+ async = abandoned = active = 0
+
+ def __init__(self, disconnected):
+ self.errtimes = {}
+ self.times = {}
+ self.disconnected = disconnected
+ self.connected = self.maxactive = self.calls = self.replies = 0
+ self.errors = 0
+ self.event = threading.Event()
+
+ def __repr__(self):
+ return ("%(connected)s %(disconnected)s %(maxactive)s"
+ " %(calls)s %(replies)s %(errors)s"
+ % self.__dict__)
+
+ def run(self, queue):
+ while 1:
+ got = queue.get()
+ op, args = got
+ getattr(self, op)(*args)
+
+ def connect(self):
+ self.disconnected -= 1
+ if not self.disconnected:
+ self.event.set()
+ self.connected += 1
+
+ def disconnect(self, abandoned):
+ self.disconnected += 1
+ self.connected -= 1
+ self.abandoned += abandoned
+ self.active -= abandoned
+
+ def request(self, op, args):
+ self.active += 1
+ self.maxactive = max(self.maxactive, self.active)
+ self.calls += 1
+
+ def reply(self, op, args, ret, elapsed):
+ self.active -= 1
+ self.replies += 1
+ if (isinstance(ret, tuple)
+ and len(ret) == 2
+ and isinstance(ret[1], Exception)
+ ):
+ n, t = self.errtimes.get(op, zz)
+ self.errtimes[op] = n+1, t+elapsed
+ #print ' OOPS', op, args, elapsed, ret[0].__name__, ret[1]
+ self.errors += 1
+ else:
+ n, t = self.times.get(op, zz)
+ self.times[op] = n+1, t+elapsed
+ sys.stdout.flush()
+
+
+
+def parse_addr(addr):
+ addr = addr.split(':')
+ return addr[0], int(addr[1])
+
+def print_times(last_times, times, label):
+ print 'Time per op (milliseconds)', label
+ times = times.copy()
+ for op in sorted(times):
+ n, t = times[op]
+ last = last_times.get(op)
+ if last:
+ n -= last[0]
+ t -= last[1]
+ if n:
+ print "%20s %10d %10.3f" % (op, n, t*1000/n)
+
+ return times
+
+def main(args=None):
+ """Usage: script address log source
+
+ Where:
+
+ address
+ The address of the zeo server to provide input to.
+
+ log
+ zeo input log
+
+ """
+ if args is None:
+ args = sys.argv[1:]
+
+
+ # Maybe add options for following:
+ singe_threaded = False
+ simulate_ssd = False
+
+ print "$Id$"
+ print args
+ addr = args.pop(0)
+ log = args.pop(0)
+ if args:
+ max_records = int(args.pop(0))
+ else:
+ max_records = 999999999
+ assert not args
+ addr = parse_addr(addr)
+
+ log = Log(log, set(('loadEx', 'loadBefore', 'sendBlob')))
+
+ # Set up the client connections
+ sessions = {}
+ nhandlers = 0
+ handlers_queue = Queue()
+ for session, timetime, msgid, async, op, args in log:
+ if singe_threaded:
+ session = '1'
+ if session not in sessions:
+ handler_queue = Queue()
+ process = Process(
+ target = Handler,
+ args = (addr, nhandlers, handler_queue, handlers_queue),
+ )
+ process.daemon = True
+ process.start()
+ sessions[session] = handler_queue
+ nhandlers += 1
+
+ nsessions = len(sessions)
+ handlers = Handlers(nsessions)
+ thread = threading.Thread(target=handlers.run, args=(handlers_queue, ))
+ thread.setDaemon(True)
+ thread.start()
+
+ handlers.event.wait(10)
+ if not handlers.event.is_set():
+ raise ValueError("Couldn't connect.", handlers)
+
+ # Now, we're ready to replay.
+ nrecords = 0
+ start = lastnow = time.time()
+ firsttt = lasttt = log.start()
+ speed = speed1 = None
+ last_times = {}
+ for session, timetime, msgid, async, op, args in log:
+
+ if singe_threaded:
+ session = '1'
+
+ if simulate_ssd:
+ if op == 'loadEx':
+ args = (ZODB.utils.z64,)
+ else:
+ assert op in ('loadBefore', 'sendBlob')
+ continue
+
+ nwaaa = 0
+ while nrecords-handlers.replies-handlers.errors > nsessions*3:
+ if nwaaa and nwaaa%1000 == 0:
+ print 'waiting', nrecords, handlers.replies, handlers.errors,
+ print nrecords-handlers.replies-handlers.errors
+ sys.stdout.flush()
+
+ time.sleep(.01)
+ nwaaa += 1
+
+ if nrecords and (nrecords%10000 == 0):
+ if (nrecords%100000 == 0):
+ last_times = print_times(last_times, handlers.times,
+ "after %s operations" % nrecords)
+
+ now = time.time()
+ if now > start:
+ speed = (timetime-firsttt) / (now-start)
+ if now > lastnow:
+ speed1 = (timetime-lasttt) / (now-lastnow)
+ lastnow = now
+ lasttt = timetime
+
+ print nrecords,
+ print time.strftime('%H:%M:%S', time.localtime(time.time())),
+ print ZODB.TimeStamp.TimeStamp(time_stamp(timetime)), handlers,
+ print speed, speed1, handlers.active, handlers_queue.qsize()
+
+ sys.stdout.flush()
+
+
+ nrecords += 1
+
+ handlers.maxactive = 0
+
+
+ sessions[session].put((async, op, args))
+ if nrecords >= max_records:
+ break
+
+ print '='*70
+
+ print speed, nrecords
+
+ for op in sorted(handlers.errtimes):
+ n, t = handlers.times[op]
+ print 'err', op, n, t/n
+
+ print_times(last_times, handlers.times,
+ "after %s transactions" % nrecords)
+
+ print_times({}, handlers.times, "overall")
+ sys.stdout.flush()
+
Property changes on: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
More information about the checkins
mailing list