[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py checkpoint

Jim Fulton jim at zope.com
Thu Sep 10 15:26:41 EDT 2009


Log message for revision 103716:
  checkpoint
  

Changed:
  U   ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py

-=-
Modified: ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py	2009-09-10 19:24:46 UTC (rev 103715)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py	2009-09-10 19:26:41 UTC (rev 103716)
@@ -1348,21 +1348,48 @@
     def info(self, arg):
         self.rpc.callAsync('info', arg)
 
+# original:
+#     def storeBlob(self, oid, serial, blobfilename):
+
+#         # This "returns" after the data are sent because the result
+#         # message will be added to the output queue after the iterator.
+
+#         def store():
+#             yield ('receiveBlobStart', (oid, serial))
+#             f = open(blobfilename, 'rb')
+#             while 1:
+#                 chunk = f.read(59000)
+#                 if not chunk:
+#                     break
+#                 yield ('receiveBlobChunk', (oid, serial, chunk, ))
+#             f.close()
+#             yield ('receiveBlobStop', (oid, serial))
+
+#         self.rpc.callAsyncIterator(store())
+
+    @ZEO.thready.delayed
     def storeBlob(self, oid, serial, blobfilename):
 
-        def store():
-            yield ('receiveBlobStart', (oid, serial))
-            f = open(blobfilename, 'rb')
-            while 1:
-                chunk = f.read(59000)
-                if not chunk:
-                    break
-                yield ('receiveBlobChunk', (oid, serial, chunk, ))
-            f.close()
-            yield ('receiveBlobStop', (oid, serial))
+        self.rpc.callAsync('receiveBlobStart', oid, serial)
 
-        self.rpc.callAsyncIterator(store())
+        f = open(blobfilename, 'rb')
+        event = threading.Event()
+        while 1:
+            chunk = f.read(65536)
+            if not chunk:
+                break
+            self.rpc.callAsyncIterator(
+                send_one_chunk(oid, serial, chunk, event))
+            event.wait()
+            event.clear()
 
+        f.close()
+        self.rpc.callAsync('receiveBlobStop', oid, serial)
+
+def send_one_chunk(oid, serial, chunk, event):
+    yield ('receiveBlobChunk', (oid, serial, chunk, ))
+    event.set()
+
 class ClientStub308(ClientStub):
 
     def invalidateTransaction(self, tid, args):



More information about the Zodb-checkins mailing list