[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py Hacked in some untested :) code for testing loading blobs via

Jim Fulton jim at zope.com
Mon Aug 1 15:54:36 EDT 2011


Log message for revision 122438:
  Hacked in some untested :) code for testing loading blobs via
  connectiony HTTP.
  

Changed:
  U   zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py

-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py	2011-08-01 19:17:55 UTC (rev 122437)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/readonlyreplay.py	2011-08-01 19:54:36 UTC (rev 122438)
@@ -14,13 +14,14 @@
 """Replay ZEO input logs in read-only mode
 """
 
+from multiprocessing import Process, Queue
+# from threading import Thread as Process
+# from Queue import Queue
 
 import cPickle
+import httplib
 import logging
 import marshal
-from multiprocessing import Process, Queue
-# from threading import Thread as Process
-# from Queue import Queue
 import optparse
 import os
 import sys
@@ -29,9 +30,11 @@
 import time
 import traceback
 import transaction
+import urlparse
+import zc.ngi.adapters
 import zc.ngi.async
-import zc.ngi.adapters
 import ZODB.TimeStamp
+import ZODB.blob
 import ZODB.utils
 
 sys.setcheckinterval(999)
@@ -272,8 +275,58 @@
         else:
             Handler.call(self, op, args)
 
+class HTTPHandler(Handler):
 
+    def __init__(self, url, addr, session, inq, outq):
+        if not url[-1] == '/':
+            url += '/'
+        url = urlparse.urlparse(url)
+        self.blob_prefix = url.path
+        self.blob_conn = httplib.HTTPConnection(url.netloc)
+        self.blob_layout = ZODB.blob.BushyLayout()
+        Handler.__init__(self, addr, session, inq, outq)
 
+    def call(self, op, args):
+        if op == 'sendBlob':
+            conn = self.blob_conn
+            path = self.blob_layout.getBlobFilePath(*args)
+            self.output('request', op, args)
+            try:
+                try:
+                    t = time.time()
+                    conn.request("GET", self.blob_prefix+path,
+                                 headers=dict(Connection='Keep-Alive'))
+                    r = conn.getresponse()
+                except httplib.HTTPException:
+                    t = time.time()
+                    conn.connect()
+                    conn.request("GET", self.blob_prefix+path,
+                                 headers=dict(Connection='Keep-Alive'))
+                    r = conn.getresponse()
+
+                self.read_blob(r)
+                ret = None
+            except Exception, v:
+                ret = None, v
+
+            elapsed = time.time() - t
+
+            self.output('reply', op, args, ret, elapsed)
+        else:
+            Handler.call(self, op, args)
+
+
+        def read_blob(self, r):
+            r.read()
+
+class HTTPWritingHandler(HTTPHandler):
+
+    def read_blob(self, r):
+        f = tempfile.TemporaryFile()
+        f.write(r.read())
+        f.close()
+
+
 zz = 0, 0
 
 class Handlers:
@@ -368,6 +421,16 @@
                   help="""
 Get blobs from the given s3 folder: BUCKET/FOLDER
 """)
+parser.add_option("--blob-url", "-u", dest='blob_url',
+                  help="""
+Get blobs from an HTTP server at the given URL.
+""")
+parser.add_option("--blob-url-with-blobs-written", "-U",
+                  dest='blob_url_written',
+                  help="""
+Get blobs from an HTTP server at the given URL.
+Write bob data to a temporary file to simulate real download
+""")
 parser.add_option("--status-port", "-p", dest='status_port',
                   type="int",
                   help="Port to get status data from.")
@@ -411,6 +474,18 @@
                     args = (options.s3,
                             addr, nhandlers, handler_queue, handlers_queue),
                     )
+            elif options.blob_url:
+                process = Process(
+                    target = HTTPHandler,
+                    args = (options.blob_url,
+                            addr, nhandlers, handler_queue, handlers_queue),
+                    )
+            elif options.blob_url_written:
+                process = Process(
+                    target = HTTPWritingHandler,
+                    args = (options.blob_url_written,
+                            addr, nhandlers, handler_queue, handlers_queue),
+                    )
             else:
                 process = Process(
                     target = Handler,



More information about the checkins mailing list