[Checkins] SVN: zc.FileStorage/dev/src/zc/FileStorage/__init__.py Changed to do most of th epacking work in a subprocess to:

Jim Fulton jim at zope.com
Fri Dec 7 19:57:26 EST 2007


Log message for revision 82193:
  Changed to do most of th epacking work in a subprocess to:
  
  - Avoid large process growth
  
  - take advantage of additional processors
  
  Checkpointing.  I have one semi-spurious test failure to deal with.
  

Changed:
  U   zc.FileStorage/dev/src/zc/FileStorage/__init__.py

-=-
Modified: zc.FileStorage/dev/src/zc/FileStorage/__init__.py
===================================================================
--- zc.FileStorage/dev/src/zc/FileStorage/__init__.py	2007-12-07 22:56:26 UTC (rev 82192)
+++ zc.FileStorage/dev/src/zc/FileStorage/__init__.py	2007-12-08 00:57:26 UTC (rev 82193)
@@ -12,7 +12,9 @@
 #
 ##############################################################################
 
+import cPickle
 import os
+import subprocess
 import sys
 
 from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
@@ -32,7 +34,7 @@
     """
 
     def seek(self, pos, whence=0):
-        if pos != self.tell():
+        if whence or (pos != self.tell()):
             file.seek(self, pos, whence)
     
 
@@ -49,7 +51,6 @@
         # when the storage is disk is doing other io
         self._file = OptionalSeekFile(path, "rb")
 
-        self._path = path
         self._stop = stop
         self.locked = 0
         self.file_end = current_size
@@ -65,12 +66,214 @@
         self.ltid = z64
 
     def pack(self):
+        script = self._name+'.packscript'
+        open(script, 'w').write(pack_script_template % dict(
+            path = self._name,
+            stop = self._stop,
+            size = self.file_end,
+            syspath = sys.path,
+            ))
+        proc = subprocess.Popen(
+            (sys.executable, script),
+            stdin=subprocess.PIPE,
+            stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+            close_fds=True,
+            )
+        
+
+        proc.stdin.close()
+        out = proc.stdout.read()
+        if proc.wait():
+            raise RuntimeError('The Pack subprocess failed\n'
+                               +'-'*60+out+'-'*60+'\n')
+
+        packindex_path = self._name+".packindex"
+        if not os.path.exists(packindex_path):
+            return # already packed or pack didn't benefit
+
+        index, opos = cPickle.Unpickler(open(packindex_path, 'rb')).load()
+        os.remove(packindex_path)
+        os.remove(self._name+".packscript")
+
+        output = OptionalSeekFile(self._name + ".pack", "r+b")
+        output.seek(0, 2)
+        assert output.tell() == opos
+        self.copyRest(self.file_end, output, index)
+
+        # OK, we've copied everything. Now we need to wrap things up.
+        pos = output.tell()
+        output.close()
+
+        # Grrrrr. The caller wants these attrs
+        self.index = index
+        self.vindex = {}
+        self.tindex = {}
+        self.tvindex = {}
+        self.oid2tid = {}
+        self.toid2tid = {}
+        self.toid2tid_delete = {}
+
+        return pos
+
+    def copyRest(self, input_pos, output, index):
+        # Copy data records written since packing started.
+
+        self._commit_lock_acquire()
+        self.locked = 1
+        # Re-open the file in unbuffered mode.
+
+        # The main thread may write new transactions to the file,
+        # which creates the possibility that we will read a status
+        # 'c' transaction into the pack thread's stdio buffer even
+        # though we're acquiring the commit lock.  Transactions
+        # can still be in progress throughout much of packing, and
+        # are written to the same physical file but via a distinct
+        # Python file object.  The code used to leave off the
+        # trailing 0 argument, and then on every platform except
+        # native Windows it was observed that we could read stale
+        # data from the tail end of the file.
+        self._file = OptionalSeekFile(self._name, "rb", 0)
+        try:
+            try:
+                while 1:
+                    # The call below will raise CorruptedDataError at EOF.
+                    input_pos = self._copyNewTrans(
+                        input_pos, output, index,
+                        self._commit_lock_acquire, self._commit_lock_release)
+            except CorruptedDataError, err:
+                # The last call to copyOne() will raise
+                # CorruptedDataError, because it will attempt to read past
+                # the end of the file.  Double-check that the exception
+                # occurred for this reason.
+                self._file.seek(0, 2)
+                endpos = self._file.tell()
+                if endpos != err.pos:
+                    raise
+        finally:
+            self._file.close()
+
+    def _copyNewTrans(self, input_pos, output, index,
+                      acquire=None, release=None):
+        tindex = {}
+        copier = PackCopier(output, index, {}, tindex, {})
+        th = self._read_txn_header(input_pos)
+        if release is not None:
+            release()
+            
+        output_tpos = output.tell()
+        copier.setTxnPos(output_tpos)
+        output.write(th.asString())
+        tend = input_pos + th.tlen
+        input_pos += th.headerlen()
+        while input_pos < tend:
+            h = self._read_data_header(input_pos)
+            prev_txn = None
+            if h.plen:
+                data = self._file.read(h.plen)
+            else:
+                # If a current record has a backpointer, fetch
+                # refs and data from the backpointer.  We need
+                # to write the data in the new record.
+                data = self.fetchBackpointer(h.oid, h.back)
+                if h.back:
+                    prev_txn = self.getTxnFromData(h.oid, h.back)
+
+            if h.version:
+                self.fail(pos, "Versions are not supported.")
+
+            copier.copy(h.oid, h.tid, data, '', prev_txn,
+                        output_tpos, output.tell())
+
+            input_pos += h.recordlen()
+
+        output_pos = output.tell()
+        tlen = p64(output_pos - output_tpos)
+        output.write(tlen)
+        output_pos += 8
+
+        if tlen != th.tlen:
+            # Update the transaction length
+            output.seek(output_tpos + 8)
+            output.write(tlen)
+            output.seek(output_pos)
+
+        index.update(tindex)
+        tindex.clear()
+
+        if acquire is not None:
+            acquire()
+
+        return input_pos + 8
+
+    def fetchBackpointer(self, oid, back):
+        if back == 0:
+            return None
+        data, tid = self._loadBackTxn(oid, back, 0)
+        return data
+
+sys.modules['ZODB.FileStorage.FileStorage'
+            ].FileStoragePacker = FileStoragePacker
+ZODB.FileStorage.FileStorage.supportsVersions = lambda self: False
+
+class PackCopier(ZODB.FileStorage.fspack.PackCopier):
+
+    def _txn_find(self, tid, stop_at_pack):
+        # _pos always points just past the last transaction
+        pos = self._pos
+        while pos > 4:
+            self._file.seek(pos - 8)
+            pos = pos - u64(self._file.read(8)) - 8
+            self._file.seek(pos)
+            h = self._file.read(TRANS_HDR_LEN)
+            _tid = h[:8]
+            if _tid == tid:
+                return pos
+            if stop_at_pack:
+                if h[16] == 'p':
+                    break
+
+        return None
+
+
+pack_script_template = """
+
+import sys
+
+sys.path[:] = %(syspath)r
+
+import zc.FileStorage
+
+packer = zc.FileStorage.PackProcess(%(path)r, %(stop)r, %(size)r)
+packer.pack()
+"""
+
+class PackProcess(FileStoragePacker):
+
+    def __init__(self, path, stop, current_size):
+        self._name = path
+        # We open our own handle on the storage so that much of pack can
+        # proceed in parallel.  It's important to close this file at every
+        # return point, else on Windows the caller won't be able to rename
+        # or remove the storage file.
+
+        # We set the buffer quite high (32MB) to try to reduce seeks
+        # when the storage is disk is doing other io
+        self._file = OptionalSeekFile(path, "rb")
+
+        self._name = path
+        self._stop = stop
+        self.locked = 0
+        self.file_end = current_size
+
+        self.ltid = z64
+
+    def pack(self):
         packed, index, references, packpos = self.buildPackIndex(
             self._stop, self.file_end)
         if packed:
             # nothing to do
             self._file.close()
-            return None
+            return
         
         self.updateReferences(references, packpos, self.file_end)
         index = self.gc(index, references)
@@ -82,29 +285,20 @@
             self._file.close()
             output.close()
             os.remove(self._name + ".pack")
-            return None
+            return
 
-        input_pos = self.copyFromPacktime(packpos, self.file_end, output, index)
-        self.copyRest(input_pos, output, index)
+        self.copyFromPacktime(packpos, self.file_end, output, index)
 
-        # OK, we've copied everything. Now we need to wrap things up.
-        pos = output.tell()
+        # Save the index so the parent process can use it as a starting point.
+        f = open(self._name + ".packindex", 'wb')
+        cPickle.Pickler(f, 1).dump((index, output.tell()))
+        f.close()
         output.flush()
+        os.fsync(output.fileno())
         output.close()
         self._file.close()
 
-        # Grrrrr. The caller wants these attrs
-        self.index = index
-        self.vindex = {}
-        self.tindex = {}
-        self.tvindex = {}
-        self.oid2tid = {}
-        self.toid2tid = {}
-        self.toid2tid_delete = {}
 
-        return pos
-
-
     def buildPackIndex(self, stop, file_end):
         index = ZODB.fsIndex.fsIndex()
         references = BTrees.LOBTree.LOBTree()
@@ -326,121 +520,3 @@
         while input_pos < file_end:
             input_pos = self._copyNewTrans(input_pos, output, index)
         return input_pos
-
-    def copyRest(self, input_pos, output, index):
-        # Copy data records written since packing started.
-
-        self._commit_lock_acquire()
-        self.locked = 1
-        # Re-open the file in unbuffered mode.
-
-        # The main thread may write new transactions to the file,
-        # which creates the possibility that we will read a status
-        # 'c' transaction into the pack thread's stdio buffer even
-        # though we're acquiring the commit lock.  Transactions
-        # can still be in progress throughout much of packing, and
-        # are written to the same physical file but via a distinct
-        # Python file object.  The code used to leave off the
-        # trailing 0 argument, and then on every platform except
-        # native Windows it was observed that we could read stale
-        # data from the tail end of the file.
-        self._file.close()
-        self._file = OptionalSeekFile(self._path, "rb", 0)
-
-        try:
-            while 1:
-                # The call below will raise CorruptedDataError at EOF.
-                input_pos = self._copyNewTrans(
-                    input_pos, output, index,
-                    self._commit_lock_acquire, self._commit_lock_release)
-        except CorruptedDataError, err:
-            # The last call to copyOne() will raise
-            # CorruptedDataError, because it will attempt to read past
-            # the end of the file.  Double-check that the exception
-            # occurred for this reason.
-            self._file.seek(0, 2)
-            endpos = self._file.tell()
-            if endpos != err.pos:
-                raise
-
-    def _copyNewTrans(self, input_pos, output, index,
-                      acquire=None, release=None):
-        tindex = {}
-        copier = PackCopier(output, index, {}, tindex, {})
-        th = self._read_txn_header(input_pos)
-        if release is not None:
-            release()
-            
-        output_tpos = output.tell()
-        copier.setTxnPos(output_tpos)
-        output.write(th.asString())
-        tend = input_pos + th.tlen
-        input_pos += th.headerlen()
-        while input_pos < tend:
-            h = self._read_data_header(input_pos)
-            prev_txn = None
-            if h.plen:
-                data = self._file.read(h.plen)
-            else:
-                # If a current record has a backpointer, fetch
-                # refs and data from the backpointer.  We need
-                # to write the data in the new record.
-                data = self.fetchBackpointer(h.oid, h.back)
-                if h.back:
-                    prev_txn = self.getTxnFromData(h.oid, h.back)
-
-            if h.version:
-                self.fail(pos, "Versions are not supported.")
-
-            copier.copy(h.oid, h.tid, data, '', prev_txn,
-                        output_tpos, output.tell())
-
-            input_pos += h.recordlen()
-
-        output_pos = output.tell()
-        tlen = p64(output_pos - output_tpos)
-        output.write(tlen)
-        output_pos += 8
-
-        if tlen != th.tlen:
-            # Update the transaction length
-            output.seek(output_tpos + 8)
-            output.write(tlen)
-            output.seek(output_pos)
-
-        index.update(tindex)
-        tindex.clear()
-
-        if acquire is not None:
-            acquire()
-
-        return input_pos + 8
-
-    def fetchBackpointer(self, oid, back):
-        if back == 0:
-            return None
-        data, tid = self._loadBackTxn(oid, back, 0)
-        return data
-
-sys.modules['ZODB.FileStorage.FileStorage'
-            ].FileStoragePacker = FileStoragePacker
-ZODB.FileStorage.FileStorage.supportsVersions = lambda self: False
-
-class PackCopier(ZODB.FileStorage.fspack.PackCopier):
-
-    def _txn_find(self, tid, stop_at_pack):
-        # _pos always points just past the last transaction
-        pos = self._pos
-        while pos > 4:
-            self._file.seek(pos - 8)
-            pos = pos - u64(self._file.read(8)) - 8
-            self._file.seek(pos)
-            h = self._file.read(TRANS_HDR_LEN)
-            _tid = h[:8]
-            if _tid == tid:
-                return pos
-            if stop_at_pack:
-                if h[16] == 'p':
-                    break
-
-        return None



More information about the Checkins mailing list