[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py checkpoint.

Jim Fulton jim at zope.com
Wed Sep 23 13:05:55 EDT 2009


Log message for revision 104455:
  checkpoint.
  

Changed:
  A   zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py

-=-
Added: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py	                        (rev 0)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py	2009-09-23 17:05:54 UTC (rev 104455)
@@ -0,0 +1,241 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Replay ZEO input logs
+
+This is a bit tricky.  If we just play the log, the writes are likely
+to be processed differently than they were before, because the server
+will handle them in different orders.  This has lots of
+implications. For example, a set of conflicting transactions may have
+a different winner, leading to different objects being created.
+
+We'll deal with this by aborting any writes in the input.  We'll still
+process the writes, but we'll convert transaction_vote calls to
+transaction_aborts.
+
+We'll get actual input data by "recovering" data from a full data file.  We'll play back the input log only as
+fast as we recover.  We won't send an input log message whos time is
+>= the time of the last recovered transaction.
+
+So, to replay, you'll need:
+
+- an input log
+- a test database
+- a non-test source database
+
+The test database is truncated to the beginning of the input log.
+
+The reply script will iterate over the source database, starting at
+the time if the first input log message.  As we recover, we'll also
+send input messages.  We'll stop when we get to the end of the input
+log.
+
+We'll time 2-way calls.  We'll also time blob loads. Finally, we'll
+time how long it takes to play the input log.
+
+
+"""
+
+
+import cPickle
+import marshal
+import os
+import sys
+import threading
+import time
+import zc.ngi.async
+import zc.ngi.adapters
+import ZODB.Blob
+import ZODB.FileStorage
+import ZODB.POSException
+import ZODB.TimeStamp
+import ZODB.utils
+
+
+def time_stamp(timetime):
+    return repr(ZODB.TimeStamp.TimeStamp(
+        *time.gmtime(timetime)[:5]
+        +(time.gmtime(timetime)[5]+divmod(timetime,1)[1],)
+        ))
+
+def truncatefs(t, fs, blobs=None):
+    """Truncate a file storage to time (time.time) t.
+    """
+    tid = time_stamp(t)
+    it = ZODB.FileStorage.FileIterator(fs, start=tid)
+    open(fs, 'r+b').truncate(it._pos)
+    if blobs is None:
+        return
+    for base, dirs, files in os.walk(blobs):
+        for name in files:
+            if not name.endswith(BLOB_SUFFIX):
+                continue
+            serial = filename[:-len(ZODB.Blob.BLOB_SUFFIX)]
+            serial = ZODB.utils.repr_to_oid(serial)
+            if serial >= tid:
+                ZODB.Blob.remove_committed(os.path.join(base, name))
+
+
+def extract(fsname, log, outname):
+    start = time_stamp(log.start())
+    end = time_stamp(log.end())
+    out = open(outname, 'wb')
+    for t in ZODB.FileStorage.FileIterator(fsname, start, end):
+        marshal.dump(('t', t.tid, t.status, t.user, t.desc, t.extension),
+                     out)
+        for r in t:
+            marshal.dump(('r', r.oid, r.tid, r.data), out)
+        marshal.dump(('c',), out)
+    out.close()
+
+class Log(object):
+
+    def __init__(self, fname):
+        self.fname = fname
+
+    def start(self):
+        return iter(self).next()[1]
+
+    def end(self):
+        end = None
+        for x in self:
+            end = x[1]
+        return end
+
+    def __iter__(self):
+        f = open(self.fname)
+        while 1:
+            try:
+                session, timetime, message = marshal.load(f)
+            except EOFError:
+                break
+
+            msgid, async, op, args = cPickle.loads(message)
+
+            yield session, timetime, msgid, async, op, args
+
+class Handler:
+
+    protocol = None
+    msgid = 0
+
+    def __init__(self):
+        self.event = threading.Event()
+        self.queue = []
+        self.message = {}
+        self.errtimes = []
+        self.times = []
+
+    def connected(self, connection):
+        self.connection = zc.ngi.adapters.Sized(connection)
+        connection.set_handler(self)
+        self.event.set()
+
+    def failed_connect(self, reason):
+        print 'WTF failed connect', reason
+
+    def handle_close(self, reason):
+        print 'WTF Closed', reason
+
+    def handle_input(self, message):
+        if self.protocol is None:
+            self.protocol = message
+            self.connection.write(self.protocol) # Echo it back
+            self.call(0, 'register', ('1', 0))
+            self.event.set()
+            return
+
+        now = time.time()
+        msgid, flags, op, args = cPickle.loads(message)
+        if (op == '.reply'):
+            ret = args
+            op, args, start, blob = self.messages.pop(msgid)
+            if (isinstance(ret, tuple)
+                and len(ret) == 2
+                and isinstance(ret[1], Exception)
+                ):
+                err = ret[1]
+                self.errtimes.append(now-start)
+                if isinstance(err, ZODB.POSException.POSKeyError):
+                    if op == 'sendBlob':
+                        # Hm. May be due to a bad serial.
+                        # queue a
+                    self.queue.append((op, args))
+                    return
+                print 'OOPS', ret[0].__name__, ret[1]
+                # Maybe we should retry messages that generate errors.
+            else:
+                self.times.append(now-start)
+                if blob:
+                    assert op == 'loadEx'
+                    oid = args[0]
+                    tid = ret[1]
+                    self.queue.append('sendBlob', (oid, tid))
+        else:
+            if op == 'receiveBlobStop':
+                key = args
+                start = self.messages.pop(key)
+                self.times.append(now-start)
+
+    def call(self, async, op, args):
+        self.msgid += 1
+        if not async:
+            self.messages[msgid] = op, args, time.time(), 0
+
+        self.connection.write(cPickle.dumps((msgid, async, op, args)))
+
+def parse_addr(addr):
+    addr = addr.split(':')
+    return addr[0], int(addr[1])
+
+def main(args=None):
+    """Usage: script address log source
+
+    Where:
+
+       address
+          The address of the zeo server to provide input to.
+
+       log
+          zeo input log
+
+       source
+
+          A file-storage file that contains just the transaction
+          corresponding to the input log
+
+    """
+    if args is None:
+        args = sys.argv[1:]
+
+    [addr, log, source] = args
+    addr = parse_addr(addr)
+
+    # Set up the client connections
+    sessions = {}
+    for session, timetime, msgid, async, op, args in Log(log):
+        if session not in sessions:
+            handler = Handler()
+            sessions[session] = handler
+            zc.ngi.async.connect(addr, handler)
+
+    for handler in sessions.values():
+        handler.event.wait(10)
+        if not handler.event.is_set():
+            raise ValueError("Couldn't connect.")
+
+    # Now, we're ready to replay.
+    for session, timetime, msgid, async, op, args in Log(log):
+        if op in ('getAuthProtocol', 'register'):
+            continue
+        sessions[session].call(async, op, args)


Property changes on: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native



More information about the checkins mailing list