[Zope3-checkins] SVN: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/ Fixed bug in uploading of files - where the publisher was called multiple

Michael Kerrin michael.kerrin at openapp.biz
Fri Oct 7 18:31:43 EDT 2005


Log message for revision 38912:
  Fixed bug in uploading of files - where the publisher was called multiple
  times to upload a file over a certain size.
  
  The buffers.py file was copied from zope.server to here so that this package
  doesn't depend on the zope.server package.
  

Changed:
  A   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/buffers.py
  U   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/ftp.py

-=-
Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/buffers.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/buffers.py	2005-10-07 22:15:41 UTC (rev 38911)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/buffers.py	2005-10-07 22:31:43 UTC (rev 38912)
@@ -0,0 +1,236 @@
+##############################################################################
+#
+# Copyright (c) 2001-2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Buffers
+
+$Id$
+"""
+try:
+    from cStringIO import StringIO
+except ImportError:
+    from StringIO import StringIO
+
+
+# copy_bytes controls the size of temp. strings for shuffling data around.
+COPY_BYTES = 1 << 18  # 256K
+
+# The maximum number of bytes to buffer in a simple string.
+STRBUF_LIMIT = 8192
+
+
+class FileBasedBuffer(object):
+
+    remain = 0
+
+    def __init__(self, file, from_buffer=None):
+        self.file = file
+        if from_buffer is not None:
+            from_file = from_buffer.getfile()
+            read_pos = from_file.tell()
+            from_file.seek(0)
+            while 1:
+                data = from_file.read(COPY_BYTES)
+                if not data:
+                    break
+                file.write(data)
+            self.remain = int(file.tell() - read_pos)
+            from_file.seek(read_pos)
+            file.seek(read_pos)
+
+    def __len__(self):
+        return self.remain
+
+    def append(self, s):
+        file = self.file
+        read_pos = file.tell()
+        file.seek(0, 2)
+        file.write(s)
+        file.seek(read_pos)
+        self.remain = self.remain + len(s)
+
+    def get(self, bytes=-1, skip=0):
+        file = self.file
+        if not skip:
+            read_pos = file.tell()
+        if bytes < 0:
+            # Read all
+            res = file.read()
+        else:
+            res = file.read(bytes)
+        if skip:
+            self.remain -= len(res)
+        else:
+            file.seek(read_pos)
+        return res
+
+    def skip(self, bytes, allow_prune=0):
+        if self.remain < bytes:
+            raise ValueError("Can't skip %d bytes in buffer of %d bytes" % (
+                                 bytes, self.remain))
+        self.file.seek(bytes, 1)
+        self.remain = self.remain - bytes
+
+    def newfile(self):
+        raise NotImplementedError()
+
+    def prune(self):
+        file = self.file
+        if self.remain == 0:
+            read_pos = file.tell()
+            file.seek(0, 2)
+            sz = file.tell()
+            file.seek(read_pos)
+            if sz == 0:
+                # Nothing to prune.
+                return
+        nf = self.newfile()
+        while 1:
+            data = file.read(COPY_BYTES)
+            if not data:
+                break
+            nf.write(data)
+        self.file = nf
+
+    def getfile(self):
+        return self.file
+
+
+
+class TempfileBasedBuffer(FileBasedBuffer):
+
+    def __init__(self, from_buffer=None):
+        FileBasedBuffer.__init__(self, self.newfile(), from_buffer)
+
+    def newfile(self):
+        from tempfile import TemporaryFile
+        return TemporaryFile('w+b')
+
+
+
+class StringIOBasedBuffer(FileBasedBuffer):
+
+    def __init__(self, from_buffer=None):
+        if from_buffer is not None:
+            FileBasedBuffer.__init__(self, StringIO(), from_buffer)
+        else:
+            # Shortcut. :-)
+            self.file = StringIO()
+
+    def newfile(self):
+        return StringIO()
+
+
+
+class OverflowableBuffer(object):
+    """
+    This buffer implementation has four stages:
+    - No data
+    - String-based buffer
+    - StringIO-based buffer
+    - Temporary file storage
+    The first two stages are fastest for simple transfers.
+    """
+
+    overflowed = 0
+    buf = None
+    strbuf = ''  # String-based buffer.
+
+    def __init__(self, overflow):
+        # overflow is the maximum to be stored in a StringIO buffer.
+        self.overflow = overflow
+
+    def __len__(self):
+        buf = self.buf
+        if buf is not None:
+            return len(buf)
+        else:
+            return len(self.strbuf)
+
+    def _create_buffer(self):
+        # print 'creating buffer'
+        strbuf = self.strbuf
+        if len(strbuf) >= self.overflow:
+            self._set_large_buffer()
+        else:
+            self._set_small_buffer()
+        buf = self.buf
+        if strbuf:
+            buf.append(self.strbuf)
+            self.strbuf = ''
+        return buf
+
+    def _set_small_buffer(self):
+        self.buf = StringIOBasedBuffer(self.buf)
+        self.overflowed = 0
+
+    def _set_large_buffer(self):
+        self.buf = TempfileBasedBuffer(self.buf)
+        self.overflowed = 1
+
+    def append(self, s):
+        buf = self.buf
+        if buf is None:
+            strbuf = self.strbuf
+            if len(strbuf) + len(s) < STRBUF_LIMIT:
+                self.strbuf = strbuf + s
+                return
+            buf = self._create_buffer()
+        buf.append(s)
+        sz = len(buf)
+        if not self.overflowed:
+            if sz >= self.overflow:
+                self._set_large_buffer()
+
+    def get(self, bytes=-1, skip=0):
+        buf = self.buf
+        if buf is None:
+            strbuf = self.strbuf
+            if not skip:
+                return strbuf
+            buf = self._create_buffer()
+        return buf.get(bytes, skip)
+
+    def skip(self, bytes, allow_prune=0):
+        buf = self.buf
+        if buf is None:
+            strbuf = self.strbuf
+            if allow_prune and bytes == len(strbuf):
+                # We could slice instead of converting to
+                # a buffer, but that would eat up memory in
+                # large transfers.
+                self.strbuf = ''
+                return
+            buf = self._create_buffer()
+        buf.skip(bytes, allow_prune)
+
+    def prune(self):
+        """
+        A potentially expensive operation that removes all data
+        already retrieved from the buffer.
+        """
+        buf = self.buf
+        if buf is None:
+            self.strbuf = ''
+            return
+        buf.prune()
+        if self.overflowed:
+            sz = len(buf)
+            if sz < self.overflow:
+                # Revert to a faster buffer.
+                self._set_small_buffer()
+
+    def getfile(self):
+        buf = self.buf
+        if buf is None:
+            buf = self._create_buffer()
+        return buf.getfile()


Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/buffers.py
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/ftp.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/ftp.py	2005-10-07 22:15:41 UTC (rev 38911)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/ftp.py	2005-10-07 22:31:43 UTC (rev 38912)
@@ -27,26 +27,29 @@
 from twisted.protocols import ftp
 
 from utils import PublisherFileSystem
+from buffers import OverflowableBuffer
 
 class ConsumerObject(object):
     def __init__(self, fs, name):
         self.fs = fs
         self.name = name
-        self.total = 0
+        ## value copied from zope.server.adjustments.Adjustments.inbuf_overflow
+        inbuf_overflow = 525000
+        self.buffer = OverflowableBuffer(inbuf_overflow)
 
     def registerProducer(self, producer, streaming):
         assert streaming
 
     def unregisterProducer(self):
-        pass
+        self._finish()
 
+    def _finish(self):
+        self.fs.writefile(self.name, self.buffer.getfile())
+
     def write(self, bytes):
-        ## XXX - this is going to mess up the transaction machinary since
-        ## for a big file this method could be called hundreds of times.
-        instream = StringIO(bytes)
-        self.fs.writefile(self.name, instream, start = self.total)
-        self.total += len(bytes)
+        self.buffer.append(bytes)
 
+
 class ZopeFTPShell(object):
     """An abstraction of the shell commands used by the FTP protocol
     for a given user account



More information about the Zope3-Checkins mailing list