[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py Hacked in some untested :) code for testing loading blobs via
Jim Fulton
jim at zope.com
Mon Aug 1 15:54:36 EDT 2011
Log message for revision 122438:
Hacked in some untested :) code for testing loading blobs via
connectiony HTTP.
Changed:
U zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py 2011-08-01 19:17:55 UTC (rev 122437)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py 2011-08-01 19:54:36 UTC (rev 122438)
@@ -14,13 +14,14 @@
"""Replay ZEO input logs in read-only mode
"""
+from multiprocessing import Process, Queue
+# from threading import Thread as Process
+# from Queue import Queue
import cPickle
+import httplib
import logging
import marshal
-from multiprocessing import Process, Queue
-# from threading import Thread as Process
-# from Queue import Queue
import optparse
import os
import sys
@@ -29,9 +30,11 @@
import time
import traceback
import transaction
+import urlparse
+import zc.ngi.adapters
import zc.ngi.async
-import zc.ngi.adapters
import ZODB.TimeStamp
+import ZODB.blob
import ZODB.utils
sys.setcheckinterval(999)
@@ -272,8 +275,58 @@
else:
Handler.call(self, op, args)
+class HTTPHandler(Handler):
+ def __init__(self, url, addr, session, inq, outq):
+ if not url[-1] == '/':
+ url += '/'
+ url = urlparse.urlparse(url)
+ self.blob_prefix = url.path
+ self.blob_conn = httplib.HTTPConnection(url.netloc)
+ self.blob_layout = ZODB.blob.BushyLayout()
+ Handler.__init__(self, addr, session, inq, outq)
+ def call(self, op, args):
+ if op == 'sendBlob':
+ conn = self.blob_conn
+ path = self.blob_layout.getBlobFilePath(*args)
+ self.output('request', op, args)
+ try:
+ try:
+ t = time.time()
+ conn.request("GET", self.blob_prefix+path,
+ headers=dict(Connection='Keep-Alive'))
+ r = conn.getresponse()
+ except httplib.HTTPException:
+ t = time.time()
+ conn.connect()
+ conn.request("GET", self.blob_prefix+path,
+ headers=dict(Connection='Keep-Alive'))
+ r = conn.getresponse()
+
+ self.read_blob(r)
+ ret = None
+ except Exception, v:
+ ret = None, v
+
+ elapsed = time.time() - t
+
+ self.output('reply', op, args, ret, elapsed)
+ else:
+ Handler.call(self, op, args)
+
+
+ def read_blob(self, r):
+ r.read()
+
+class HTTPWritingHandler(HTTPHandler):
+
+ def read_blob(self, r):
+ f = tempfile.TemporaryFile()
+ f.write(r.read())
+ f.close()
+
+
zz = 0, 0
class Handlers:
@@ -368,6 +421,16 @@
help="""
Get blobs from the given s3 folder: BUCKET/FOLDER
""")
+parser.add_option("--blob-url", "-u", dest='blob_url',
+ help="""
+Get blobs from an HTTP server at the given URL.
+""")
+parser.add_option("--blob-url-with-blobs-written", "-U",
+ dest='blob_url_written',
+ help="""
+Get blobs from an HTTP server at the given URL.
+Write bob data to a temporary file to simulate real download
+""")
parser.add_option("--status-port", "-p", dest='status_port',
type="int",
help="Port to get status data from.")
@@ -411,6 +474,18 @@
args = (options.s3,
addr, nhandlers, handler_queue, handlers_queue),
)
+ elif options.blob_url:
+ process = Process(
+ target = HTTPHandler,
+ args = (options.blob_url,
+ addr, nhandlers, handler_queue, handlers_queue),
+ )
+ elif options.blob_url_written:
+ process = Process(
+ target = HTTPWritingHandler,
+ args = (options.blob_url_written,
+ addr, nhandlers, handler_queue, handlers_queue),
+ )
else:
process = Process(
target = Handler,
More information about the checkins
mailing list