[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py Added options:

Jim Fulton jim at zope.com
Mon Jul 25 11:27:32 EDT 2011


Log message for revision 122342:
  Added options:
  
  - s3 folder for loading blobs from s3
  
  - status command port to connect to on the server to get status info
    (e.g. free and tail of iostat log.)
  
  Made max_records an option.
  

Changed:
  U   zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py

-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py	2011-07-25 13:41:39 UTC (rev 122341)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py	2011-07-25 15:27:32 UTC (rev 122342)
@@ -21,6 +21,7 @@
 from multiprocessing import Process, Queue
 # from threading import Thread as Process
 # from Queue import Queue
+import optparse
 import os
 import sys
 import threading
@@ -240,6 +241,38 @@
 
             self.output('reply', op, args, ret, elapsed)
 
+class S3Handler(Handler):
+
+    def __init__(self, folder, addr, session, inq, outq):
+        bucket_name, self.folder = os.environ['S3_FOLDER'].split('/', 1)
+        import boto.s3.connection
+        import boto.s3.key
+        self.s3 = boto.s3.key.Key(
+            boto.s3.connection.S3Connection().get_bucket(bucket_name))
+        Handler.__init__(self, addr, session, inq, outq)
+
+    def call(self, op, args):
+        if op == 'sendBlob':
+            oid, serial = args
+            key.key = "%s/%s/%s" % (
+                self.folder, oid.encode('hex'), serial.encode('hex'))
+            self.output('request', op, args)
+            f = tempfile.TemporaryFile()
+            t = time.time()
+            try:
+                key.get_contents_to_file(f)
+                ret = None
+            except Exception, v:
+                ret = None, v
+            elapsed = time.time() - t
+            self.output('reply', op, args, ret, elapsed)
+            f.close()
+
+        else:
+            Handler.call(self, op, args)
+
+
+
 zz = 0, 0
 
 class Handlers:
@@ -314,25 +347,9 @@
 
     return times
 
-# Update the following three (3) commands for your use-case and uncomment them
-# before running this script.
+parser = optparse.OptionParser("""
+Usage: %prog [options] address log
 
-# FREE_CMD = 'free'
-# VMSTAT_CMD = 'tail -n 1 vmstat.log'
-# IOSTAT_CMD = 'tail -n 10 iostat.log | grep -B 1 -A 6 avg-cpu'
-
-def print_meminfo(host):
-    print "free: "
-    os.system(FREE_CMD)
-    print "vmstat: "
-    os.system(VMSTAT_CMD)
-    print "iostat: "
-    os.system(IOSTAT_CMD)
-    print
-
-def main(args=None):
-    """Usage: script address log source
-
     Where:
 
        address
@@ -340,14 +357,22 @@
 
        log
           zeo input log
+""")
+parser.add_option("--max_records", "-m", dest='max_records',
+                  type="int", default=999999999,
+                  help="""
+Maximum number of records to process.
+""")
+parser.add_option("--s3-folder", "-s", dest='s3',
+                  help="""
+Get blobs from the given s3 folder: BUCKET/FOLDER
+""")
+parser.add_option("--status-port", "-p", dest='status_port',
+                  type="int",
+                  help="Port to get status data from.")
 
-       sessionids
-          path to file containing session ids.
 
-       max_records (optional. default is 999999999)
-          integer value indicating how many records should be considered.
-
-    """
+def main(args=None):
     if args is None:
         args = sys.argv[1:]
 
@@ -358,14 +383,11 @@
 
     print "$Id$"
     print args
-    addr = args.pop(0)
-    log = args.pop(0)
-    sessionids = args.pop(0)
-    if args:
-        max_records = int(args.pop(0))
-    else:
-        max_records = 999999999
-    assert not args
+
+    options, args = parser.parse_args(args)
+    addr, log, sessionids = args
+    max_records = options.max_records
+
     addr = parse_addr(addr)
 
     log = Log(log, set(('loadEx', 'sendBlob')))
@@ -382,10 +404,17 @@
             session = '1'
         if session not in sessions:
             handler_queue = Queue()
-            process = Process(
-                target = Handler,
-                args = (addr, nhandlers, handler_queue, handlers_queue),
-                )
+            if options.s3:
+                process = Process(
+                    target = S3Handler,
+                    args = (options.s3,
+                            addr, nhandlers, handler_queue, handlers_queue),
+                    )
+            else:
+                process = Process(
+                    target = Handler,
+                    args = (addr, nhandlers, handler_queue, handlers_queue),
+                    )
             process.daemon = True
             process.start()
             processes.append(process)
@@ -431,7 +460,7 @@
 
         if nrecords and (nrecords%10000 == 0):
             if (nrecords%100000 == 0):
-                print_meminfo(addr[0])
+                system("nc %s %s" % (addr[0], options.status_port))
                 last_times = print_times(last_times, handlers.times,
                                          "after %s operations" % nrecords)
                 print



More information about the checkins mailing list