[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py *** empty log message ***

Jim Fulton jim at zope.com
Thu May 26 17:02:46 EDT 2011


Log message for revision 121826:
  *** empty log message ***

Changed:
  A   zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py

-=-
Added: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py	                        (rev 0)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py	2011-05-26 21:02:45 UTC (rev 121826)
@@ -0,0 +1,450 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Replay ZEO input logs in read-only mode
+"""
+
+
+import cPickle
+import logging
+import marshal
+from multiprocessing import Process, Queue
+# from threading import Thread as Process
+# from Queue import Queue
+import os
+import sys
+import threading
+import time
+import traceback
+import transaction
+import zc.ngi.async
+import zc.ngi.adapters
+import ZODB.TimeStamp
+import ZODB.utils
+
+sys.setcheckinterval(999)
+
+logging.basicConfig()
+
+def time_stamp(timetime):
+    return repr(ZODB.TimeStamp.TimeStamp(
+        *time.gmtime(timetime)[:5]
+        +(time.gmtime(timetime)[5]+divmod(timetime,1)[1],)
+        ))
+
+class Log(object):
+
+    def __init__(self, fname, ops=None):
+        self.fname = fname
+        self.ops = ops
+
+    def start(self):
+        return iter(self).next()[1]
+
+    _end = None
+    def end(self):
+        if self._end is not None:
+            return self._end
+
+        end = None
+        for x in self:
+            end = x[1]
+
+        self._end = end
+        return end
+
+    def __iter__(self):
+        f = open(self.fname)
+        ops = self.ops
+        while 1:
+            try:
+                session, timetime, message = marshal.load(f)
+            except EOFError:
+                break
+
+            msgid, async, op, args = cPickle.loads(message)
+            if ops and op not in ops:
+                continue
+
+            yield session, timetime, msgid, async, op, args
+
+    def __len__(self):
+        n = 0
+        for x in self:
+            n += 1
+        return n
+
+    def sessions(self):
+        sessions = {}
+        for session, timetime, msgid, async, op, args in self:
+            stats = sessions.get(session)
+            if stats is None:
+                stats = sessions[session] = dict(
+                    start_timetime=timetime,
+                    ops={},
+                    size=0,
+                    )
+            stats['end_timetime'] = timetime
+            stats['ops'][op] = stats['ops'].get(op, 0) + 1
+            stats['size'] += 1
+
+        return sessions
+
+    def splitsessions(self, sessions, outname):
+        # Split the given sessions my splitting roughly half the calls
+        # calls into a new session containing only loadEx calls
+        sessions = set("0%s" % session for session in sessions)
+        sizes = {}
+        out = open(outname, 'wb')
+        for session, timetime, msgid, async, op, args in self:
+            session = "0%s" % session
+            if (session in sessions
+                and op == 'loadEx'
+                and sizes.get(session, 0) > sizes.get('1'+session[1:], 0)
+                ):
+                session = '1'+session[1:]
+            sizes[session] = sizes.get(session, 0) + 1
+            marshal.dump(
+                (session, timetime, cPickle.dumps((msgid, async, op, args), 1)
+                 ),
+                out)
+
+    def splitsendblobs(self, sessions, outname):
+        # Split the given sessions my splitting roughly half the
+        # sendBlob calls calls into a new session.
+        sessions = set("0%s" % session for session in sessions)
+        sizes = {}
+        out = open(outname, 'wb')
+        for session, timetime, msgid, async, op, args in self:
+            session = "0%s" % session
+            if session in sessions and op == 'sendBlob':
+                if sizes.get(session, 0) > sizes.get('1'+session[1:], 0):
+                    session = '1'+session[1:]
+                sizes[session] = sizes.get(session, 0) + 1
+            marshal.dump(
+                (session, timetime, cPickle.dumps((msgid, async, op, args), 1)
+                 ),
+                out)
+
+class Handler:
+
+    msgid = 0
+    closed = 0
+    queueing = True
+    protocol = message = None
+
+    def __init__(self, addr, session, inq, outq):
+        self.session = session
+        self.addr = addr
+        def output(op, *args):
+            outq.put((op, args))
+        self.output = output
+        self.condition = condition = threading.Condition()
+        self.ngi = zc.ngi.async.Implementation()
+        self.ngi.connect(addr, self)
+
+        with condition:
+            while 1:
+                #print '__init__', self.protocol, self.message, inq.empty(),
+                if self.protocol and not self.message:
+                    try:
+                        callargs = inq.get(True, 1.0)
+                    except:
+                        #print 'queue timeout'
+                        continue
+                    if callargs == 'stop':
+                        break
+                    async, op, args = callargs
+                    assert not async
+                    self.call(op, args)
+                condition.wait(1)
+
+    def call(self, op, args):
+        assert not self.message
+        self.msgid += 1
+        #print time.ctime(), 'call', self.msgid, op, args
+        self.message = [self.msgid, op, args]
+        data = cPickle.dumps((self.msgid, 0, op, args))
+        append = self.message.append
+
+        @self.connection.writelines
+        @apply
+        def _():
+            append(time.time())
+            yield data
+
+        self.output('request', op, args)
+
+    def connected(self, connection):
+        self.output('connect')
+        self.protocol = None
+        self.connection = zc.ngi.adapters.Sized(connection)
+        self.connection.set_handler(self)
+
+    def failed_connect(self, reason):
+        print time.ctime(), self.session, 'WTF failed connect', reason
+
+    def handle_close(self, connection, reason):
+        with self.condition:
+            self.protocol = self.message = None
+
+        self.connection = None
+        print time.ctime(), self.session, 'WTF Closed', reason
+        self.output('disconnect', 0)
+        self.ngi.connect(self.addr, self)
+
+    def handle_input(self, connection, message):
+        now = time.time()
+        try:
+            msgid, flags, op, args = cPickle.loads(message)
+        except:
+            if message[0] == 'Z':
+                with self.condition:
+                    if self.protocol is None:
+                        connection.write(message) # Echo it back
+                        self.call('register', ('1', 1))
+                        self.protocol = message
+                        self.condition.notifyAll()
+                        return
+
+            print time.ctime(), self.session, 'bad message', repr(message)
+            traceback.print_exception(*sys.exc_info())
+            return
+
+        if op == 'invalidateTransaction':
+            return
+        #print time.ctime(), 'input', msgid, op
+        if (op == '.reply'):
+            ret = args
+            with self.condition:
+                rmsgid, op, args, start = self.message
+                assert rmsgid == msgid
+                elapsed = now-start
+                #print elapsed * 1000
+                self.message = None
+                #print 'notift reply'
+                self.condition.notifyAll()
+
+            self.output('reply', op, args, ret, elapsed)
+
+zz = 0, 0
+
+class Handlers:
+
+    async = abandoned = active = 0
+
+    def __init__(self, disconnected):
+        self.errtimes = {}
+        self.times = {}
+        self.disconnected = disconnected
+        self.connected = self.maxactive = self.calls = self.replies = 0
+        self.errors = 0
+        self.event = threading.Event()
+
+    def __repr__(self):
+        return ("%(connected)s %(disconnected)s %(maxactive)s"
+                " %(calls)s %(replies)s %(errors)s"
+                % self.__dict__)
+
+    def run(self, queue):
+        while 1:
+            got = queue.get()
+            op, args = got
+            getattr(self, op)(*args)
+
+    def connect(self):
+        self.disconnected -= 1
+        if not self.disconnected:
+            self.event.set()
+        self.connected += 1
+
+    def disconnect(self, abandoned):
+        self.disconnected += 1
+        self.connected -= 1
+        self.abandoned += abandoned
+        self.active -= abandoned
+
+    def request(self, op, args):
+        self.active += 1
+        self.maxactive = max(self.maxactive, self.active)
+        self.calls += 1
+
+    def reply(self, op, args, ret, elapsed):
+        self.active -= 1
+        self.replies += 1
+        if (isinstance(ret, tuple)
+            and len(ret) == 2
+            and isinstance(ret[1], Exception)
+            ):
+            n, t = self.errtimes.get(op, zz)
+            self.errtimes[op] = n+1, t+elapsed
+            #print '  OOPS', op, args, elapsed, ret[0].__name__, ret[1]
+            self.errors += 1
+        else:
+            n, t = self.times.get(op, zz)
+            self.times[op] = n+1, t+elapsed
+        sys.stdout.flush()
+
+
+
+def parse_addr(addr):
+    addr = addr.split(':')
+    return addr[0], int(addr[1])
+
+def print_times(last_times, times, label):
+    print 'Time per op (milliseconds)', label
+    times = times.copy()
+    for op in sorted(times):
+        n, t = times[op]
+        last = last_times.get(op)
+        if last:
+            n -= last[0]
+            t -= last[1]
+        if n:
+            print "%20s %10d %10.3f" % (op, n, t*1000/n)
+
+    return times
+
+def main(args=None):
+    """Usage: script address log source
+
+    Where:
+
+       address
+          The address of the zeo server to provide input to.
+
+       log
+          zeo input log
+
+    """
+    if args is None:
+        args = sys.argv[1:]
+
+
+    # Maybe add options for following:
+    singe_threaded = False
+    simulate_ssd = False
+
+    print "$Id$"
+    print args
+    addr = args.pop(0)
+    log = args.pop(0)
+    if args:
+        max_records = int(args.pop(0))
+    else:
+        max_records = 999999999
+    assert not args
+    addr = parse_addr(addr)
+
+    log = Log(log, set(('loadEx', 'loadBefore', 'sendBlob')))
+
+    # Set up the client connections
+    sessions = {}
+    nhandlers = 0
+    handlers_queue = Queue()
+    for session, timetime, msgid, async, op, args in log:
+        if singe_threaded:
+            session = '1'
+        if session not in sessions:
+            handler_queue = Queue()
+            process = Process(
+                target = Handler,
+                args = (addr, nhandlers, handler_queue, handlers_queue),
+                )
+            process.daemon = True
+            process.start()
+            sessions[session] = handler_queue
+            nhandlers += 1
+
+    nsessions = len(sessions)
+    handlers = Handlers(nsessions)
+    thread = threading.Thread(target=handlers.run, args=(handlers_queue, ))
+    thread.setDaemon(True)
+    thread.start()
+
+    handlers.event.wait(10)
+    if not handlers.event.is_set():
+        raise ValueError("Couldn't connect.", handlers)
+
+    # Now, we're ready to replay.
+    nrecords = 0
+    start = lastnow = time.time()
+    firsttt = lasttt = log.start()
+    speed = speed1 = None
+    last_times = {}
+    for session, timetime, msgid, async, op, args in log:
+
+        if singe_threaded:
+            session = '1'
+
+        if simulate_ssd:
+            if op == 'loadEx':
+                args = (ZODB.utils.z64,)
+            else:
+                assert op in ('loadBefore', 'sendBlob')
+                continue
+
+        nwaaa = 0
+        while nrecords-handlers.replies-handlers.errors > nsessions*3:
+            if nwaaa and nwaaa%1000 == 0:
+                print 'waiting', nrecords, handlers.replies, handlers.errors,
+                print nrecords-handlers.replies-handlers.errors
+                sys.stdout.flush()
+
+            time.sleep(.01)
+            nwaaa += 1
+
+        if nrecords and (nrecords%10000 == 0):
+            if (nrecords%100000 == 0):
+                last_times = print_times(last_times, handlers.times,
+                                         "after %s operations" % nrecords)
+
+            now = time.time()
+            if now > start:
+                speed = (timetime-firsttt) / (now-start)
+            if now > lastnow:
+                speed1 = (timetime-lasttt) / (now-lastnow)
+            lastnow = now
+            lasttt = timetime
+
+            print nrecords,
+            print time.strftime('%H:%M:%S', time.localtime(time.time())),
+            print ZODB.TimeStamp.TimeStamp(time_stamp(timetime)), handlers,
+            print speed, speed1, handlers.active, handlers_queue.qsize()
+
+            sys.stdout.flush()
+
+
+        nrecords += 1
+
+        handlers.maxactive = 0
+
+
+        sessions[session].put((async, op, args))
+        if nrecords >= max_records:
+            break
+
+    print '='*70
+
+    print speed, nrecords
+
+    for op in sorted(handlers.errtimes):
+        n, t = handlers.times[op]
+        print 'err', op, n, t/n
+
+    print_times(last_times, handlers.times,
+                "after %s transactions" % nrecords)
+
+    print_times({}, handlers.times, "overall")
+    sys.stdout.flush()
+


Property changes on: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native



More information about the checkins mailing list