[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py Added options:
Jim Fulton
jim at zope.com
Mon Jul 25 11:27:32 EDT 2011
Log message for revision 122342:
Added options:
- s3 folder for loading blobs from s3
- status command port to connect to on the server to get status info
(e.g. free and tail of iostat log.)
Made max_records an option.
Changed:
U zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py 2011-07-25 13:41:39 UTC (rev 122341)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py 2011-07-25 15:27:32 UTC (rev 122342)
@@ -21,6 +21,7 @@
from multiprocessing import Process, Queue
# from threading import Thread as Process
# from Queue import Queue
+import optparse
import os
import sys
import threading
@@ -240,6 +241,38 @@
self.output('reply', op, args, ret, elapsed)
+class S3Handler(Handler):
+
+ def __init__(self, folder, addr, session, inq, outq):
+ bucket_name, self.folder = os.environ['S3_FOLDER'].split('/', 1)
+ import boto.s3.connection
+ import boto.s3.key
+ self.s3 = boto.s3.key.Key(
+ boto.s3.connection.S3Connection().get_bucket(bucket_name))
+ Handler.__init__(self, addr, session, inq, outq)
+
+ def call(self, op, args):
+ if op == 'sendBlob':
+ oid, serial = args
+ key.key = "%s/%s/%s" % (
+ self.folder, oid.encode('hex'), serial.encode('hex'))
+ self.output('request', op, args)
+ f = tempfile.TemporaryFile()
+ t = time.time()
+ try:
+ key.get_contents_to_file(f)
+ ret = None
+ except Exception, v:
+ ret = None, v
+ elapsed = time.time() - t
+ self.output('reply', op, args, ret, elapsed)
+ f.close()
+
+ else:
+ Handler.call(self, op, args)
+
+
+
zz = 0, 0
class Handlers:
@@ -314,25 +347,9 @@
return times
-# Update the following three (3) commands for your use-case and uncomment them
-# before running this script.
+parser = optparse.OptionParser("""
+Usage: %prog [options] address log
-# FREE_CMD = 'free'
-# VMSTAT_CMD = 'tail -n 1 vmstat.log'
-# IOSTAT_CMD = 'tail -n 10 iostat.log | grep -B 1 -A 6 avg-cpu'
-
-def print_meminfo(host):
- print "free: "
- os.system(FREE_CMD)
- print "vmstat: "
- os.system(VMSTAT_CMD)
- print "iostat: "
- os.system(IOSTAT_CMD)
- print
-
-def main(args=None):
- """Usage: script address log source
-
Where:
address
@@ -340,14 +357,22 @@
log
zeo input log
+""")
+parser.add_option("--max_records", "-m", dest='max_records',
+ type="int", default=999999999,
+ help="""
+Maximum number of records to process.
+""")
+parser.add_option("--s3-folder", "-s", dest='s3',
+ help="""
+Get blobs from the given s3 folder: BUCKET/FOLDER
+""")
+parser.add_option("--status-port", "-p", dest='status_port',
+ type="int",
+ help="Port to get status data from.")
- sessionids
- path to file containing session ids.
- max_records (optional. default is 999999999)
- integer value indicating how many records should be considered.
-
- """
+def main(args=None):
if args is None:
args = sys.argv[1:]
@@ -358,14 +383,11 @@
print "$Id$"
print args
- addr = args.pop(0)
- log = args.pop(0)
- sessionids = args.pop(0)
- if args:
- max_records = int(args.pop(0))
- else:
- max_records = 999999999
- assert not args
+
+ options, args = parser.parse_args(args)
+ addr, log, sessionids = args
+ max_records = options.max_records
+
addr = parse_addr(addr)
log = Log(log, set(('loadEx', 'sendBlob')))
@@ -382,10 +404,17 @@
session = '1'
if session not in sessions:
handler_queue = Queue()
- process = Process(
- target = Handler,
- args = (addr, nhandlers, handler_queue, handlers_queue),
- )
+ if options.s3:
+ process = Process(
+ target = S3Handler,
+ args = (options.s3,
+ addr, nhandlers, handler_queue, handlers_queue),
+ )
+ else:
+ process = Process(
+ target = Handler,
+ args = (addr, nhandlers, handler_queue, handlers_queue),
+ )
process.daemon = True
process.start()
processes.append(process)
@@ -431,7 +460,7 @@
if nrecords and (nrecords%10000 == 0):
if (nrecords%100000 == 0):
- print_meminfo(addr[0])
+ system("nc %s %s" % (addr[0], options.status_port))
last_times = print_times(last_times, handlers.times,
"after %s operations" % nrecords)
print
More information about the checkins
mailing list