[Checkins] SVN: zc.FileStorage/dev/src/zc/FileStorage/__init__.py
Changed to do most of th epacking work in a subprocess to:
Jim Fulton
jim at zope.com
Fri Dec 7 19:57:26 EST 2007
Log message for revision 82193:
Changed to do most of th epacking work in a subprocess to:
- Avoid large process growth
- take advantage of additional processors
Checkpointing. I have one semi-spurious test failure to deal with.
Changed:
U zc.FileStorage/dev/src/zc/FileStorage/__init__.py
-=-
Modified: zc.FileStorage/dev/src/zc/FileStorage/__init__.py
===================================================================
--- zc.FileStorage/dev/src/zc/FileStorage/__init__.py 2007-12-07 22:56:26 UTC (rev 82192)
+++ zc.FileStorage/dev/src/zc/FileStorage/__init__.py 2007-12-08 00:57:26 UTC (rev 82193)
@@ -12,7 +12,9 @@
#
##############################################################################
+import cPickle
import os
+import subprocess
import sys
from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
@@ -32,7 +34,7 @@
"""
def seek(self, pos, whence=0):
- if pos != self.tell():
+ if whence or (pos != self.tell()):
file.seek(self, pos, whence)
@@ -49,7 +51,6 @@
# when the storage is disk is doing other io
self._file = OptionalSeekFile(path, "rb")
- self._path = path
self._stop = stop
self.locked = 0
self.file_end = current_size
@@ -65,12 +66,214 @@
self.ltid = z64
def pack(self):
+ script = self._name+'.packscript'
+ open(script, 'w').write(pack_script_template % dict(
+ path = self._name,
+ stop = self._stop,
+ size = self.file_end,
+ syspath = sys.path,
+ ))
+ proc = subprocess.Popen(
+ (sys.executable, script),
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ close_fds=True,
+ )
+
+
+ proc.stdin.close()
+ out = proc.stdout.read()
+ if proc.wait():
+ raise RuntimeError('The Pack subprocess failed\n'
+ +'-'*60+out+'-'*60+'\n')
+
+ packindex_path = self._name+".packindex"
+ if not os.path.exists(packindex_path):
+ return # already packed or pack didn't benefit
+
+ index, opos = cPickle.Unpickler(open(packindex_path, 'rb')).load()
+ os.remove(packindex_path)
+ os.remove(self._name+".packscript")
+
+ output = OptionalSeekFile(self._name + ".pack", "r+b")
+ output.seek(0, 2)
+ assert output.tell() == opos
+ self.copyRest(self.file_end, output, index)
+
+ # OK, we've copied everything. Now we need to wrap things up.
+ pos = output.tell()
+ output.close()
+
+ # Grrrrr. The caller wants these attrs
+ self.index = index
+ self.vindex = {}
+ self.tindex = {}
+ self.tvindex = {}
+ self.oid2tid = {}
+ self.toid2tid = {}
+ self.toid2tid_delete = {}
+
+ return pos
+
+ def copyRest(self, input_pos, output, index):
+ # Copy data records written since packing started.
+
+ self._commit_lock_acquire()
+ self.locked = 1
+ # Re-open the file in unbuffered mode.
+
+ # The main thread may write new transactions to the file,
+ # which creates the possibility that we will read a status
+ # 'c' transaction into the pack thread's stdio buffer even
+ # though we're acquiring the commit lock. Transactions
+ # can still be in progress throughout much of packing, and
+ # are written to the same physical file but via a distinct
+ # Python file object. The code used to leave off the
+ # trailing 0 argument, and then on every platform except
+ # native Windows it was observed that we could read stale
+ # data from the tail end of the file.
+ self._file = OptionalSeekFile(self._name, "rb", 0)
+ try:
+ try:
+ while 1:
+ # The call below will raise CorruptedDataError at EOF.
+ input_pos = self._copyNewTrans(
+ input_pos, output, index,
+ self._commit_lock_acquire, self._commit_lock_release)
+ except CorruptedDataError, err:
+ # The last call to copyOne() will raise
+ # CorruptedDataError, because it will attempt to read past
+ # the end of the file. Double-check that the exception
+ # occurred for this reason.
+ self._file.seek(0, 2)
+ endpos = self._file.tell()
+ if endpos != err.pos:
+ raise
+ finally:
+ self._file.close()
+
+ def _copyNewTrans(self, input_pos, output, index,
+ acquire=None, release=None):
+ tindex = {}
+ copier = PackCopier(output, index, {}, tindex, {})
+ th = self._read_txn_header(input_pos)
+ if release is not None:
+ release()
+
+ output_tpos = output.tell()
+ copier.setTxnPos(output_tpos)
+ output.write(th.asString())
+ tend = input_pos + th.tlen
+ input_pos += th.headerlen()
+ while input_pos < tend:
+ h = self._read_data_header(input_pos)
+ prev_txn = None
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ # If a current record has a backpointer, fetch
+ # refs and data from the backpointer. We need
+ # to write the data in the new record.
+ data = self.fetchBackpointer(h.oid, h.back)
+ if h.back:
+ prev_txn = self.getTxnFromData(h.oid, h.back)
+
+ if h.version:
+ self.fail(pos, "Versions are not supported.")
+
+ copier.copy(h.oid, h.tid, data, '', prev_txn,
+ output_tpos, output.tell())
+
+ input_pos += h.recordlen()
+
+ output_pos = output.tell()
+ tlen = p64(output_pos - output_tpos)
+ output.write(tlen)
+ output_pos += 8
+
+ if tlen != th.tlen:
+ # Update the transaction length
+ output.seek(output_tpos + 8)
+ output.write(tlen)
+ output.seek(output_pos)
+
+ index.update(tindex)
+ tindex.clear()
+
+ if acquire is not None:
+ acquire()
+
+ return input_pos + 8
+
+ def fetchBackpointer(self, oid, back):
+ if back == 0:
+ return None
+ data, tid = self._loadBackTxn(oid, back, 0)
+ return data
+
+sys.modules['ZODB.FileStorage.FileStorage'
+ ].FileStoragePacker = FileStoragePacker
+ZODB.FileStorage.FileStorage.supportsVersions = lambda self: False
+
+class PackCopier(ZODB.FileStorage.fspack.PackCopier):
+
+ def _txn_find(self, tid, stop_at_pack):
+ # _pos always points just past the last transaction
+ pos = self._pos
+ while pos > 4:
+ self._file.seek(pos - 8)
+ pos = pos - u64(self._file.read(8)) - 8
+ self._file.seek(pos)
+ h = self._file.read(TRANS_HDR_LEN)
+ _tid = h[:8]
+ if _tid == tid:
+ return pos
+ if stop_at_pack:
+ if h[16] == 'p':
+ break
+
+ return None
+
+
+pack_script_template = """
+
+import sys
+
+sys.path[:] = %(syspath)r
+
+import zc.FileStorage
+
+packer = zc.FileStorage.PackProcess(%(path)r, %(stop)r, %(size)r)
+packer.pack()
+"""
+
+class PackProcess(FileStoragePacker):
+
+ def __init__(self, path, stop, current_size):
+ self._name = path
+ # We open our own handle on the storage so that much of pack can
+ # proceed in parallel. It's important to close this file at every
+ # return point, else on Windows the caller won't be able to rename
+ # or remove the storage file.
+
+ # We set the buffer quite high (32MB) to try to reduce seeks
+ # when the storage is disk is doing other io
+ self._file = OptionalSeekFile(path, "rb")
+
+ self._name = path
+ self._stop = stop
+ self.locked = 0
+ self.file_end = current_size
+
+ self.ltid = z64
+
+ def pack(self):
packed, index, references, packpos = self.buildPackIndex(
self._stop, self.file_end)
if packed:
# nothing to do
self._file.close()
- return None
+ return
self.updateReferences(references, packpos, self.file_end)
index = self.gc(index, references)
@@ -82,29 +285,20 @@
self._file.close()
output.close()
os.remove(self._name + ".pack")
- return None
+ return
- input_pos = self.copyFromPacktime(packpos, self.file_end, output, index)
- self.copyRest(input_pos, output, index)
+ self.copyFromPacktime(packpos, self.file_end, output, index)
- # OK, we've copied everything. Now we need to wrap things up.
- pos = output.tell()
+ # Save the index so the parent process can use it as a starting point.
+ f = open(self._name + ".packindex", 'wb')
+ cPickle.Pickler(f, 1).dump((index, output.tell()))
+ f.close()
output.flush()
+ os.fsync(output.fileno())
output.close()
self._file.close()
- # Grrrrr. The caller wants these attrs
- self.index = index
- self.vindex = {}
- self.tindex = {}
- self.tvindex = {}
- self.oid2tid = {}
- self.toid2tid = {}
- self.toid2tid_delete = {}
- return pos
-
-
def buildPackIndex(self, stop, file_end):
index = ZODB.fsIndex.fsIndex()
references = BTrees.LOBTree.LOBTree()
@@ -326,121 +520,3 @@
while input_pos < file_end:
input_pos = self._copyNewTrans(input_pos, output, index)
return input_pos
-
- def copyRest(self, input_pos, output, index):
- # Copy data records written since packing started.
-
- self._commit_lock_acquire()
- self.locked = 1
- # Re-open the file in unbuffered mode.
-
- # The main thread may write new transactions to the file,
- # which creates the possibility that we will read a status
- # 'c' transaction into the pack thread's stdio buffer even
- # though we're acquiring the commit lock. Transactions
- # can still be in progress throughout much of packing, and
- # are written to the same physical file but via a distinct
- # Python file object. The code used to leave off the
- # trailing 0 argument, and then on every platform except
- # native Windows it was observed that we could read stale
- # data from the tail end of the file.
- self._file.close()
- self._file = OptionalSeekFile(self._path, "rb", 0)
-
- try:
- while 1:
- # The call below will raise CorruptedDataError at EOF.
- input_pos = self._copyNewTrans(
- input_pos, output, index,
- self._commit_lock_acquire, self._commit_lock_release)
- except CorruptedDataError, err:
- # The last call to copyOne() will raise
- # CorruptedDataError, because it will attempt to read past
- # the end of the file. Double-check that the exception
- # occurred for this reason.
- self._file.seek(0, 2)
- endpos = self._file.tell()
- if endpos != err.pos:
- raise
-
- def _copyNewTrans(self, input_pos, output, index,
- acquire=None, release=None):
- tindex = {}
- copier = PackCopier(output, index, {}, tindex, {})
- th = self._read_txn_header(input_pos)
- if release is not None:
- release()
-
- output_tpos = output.tell()
- copier.setTxnPos(output_tpos)
- output.write(th.asString())
- tend = input_pos + th.tlen
- input_pos += th.headerlen()
- while input_pos < tend:
- h = self._read_data_header(input_pos)
- prev_txn = None
- if h.plen:
- data = self._file.read(h.plen)
- else:
- # If a current record has a backpointer, fetch
- # refs and data from the backpointer. We need
- # to write the data in the new record.
- data = self.fetchBackpointer(h.oid, h.back)
- if h.back:
- prev_txn = self.getTxnFromData(h.oid, h.back)
-
- if h.version:
- self.fail(pos, "Versions are not supported.")
-
- copier.copy(h.oid, h.tid, data, '', prev_txn,
- output_tpos, output.tell())
-
- input_pos += h.recordlen()
-
- output_pos = output.tell()
- tlen = p64(output_pos - output_tpos)
- output.write(tlen)
- output_pos += 8
-
- if tlen != th.tlen:
- # Update the transaction length
- output.seek(output_tpos + 8)
- output.write(tlen)
- output.seek(output_pos)
-
- index.update(tindex)
- tindex.clear()
-
- if acquire is not None:
- acquire()
-
- return input_pos + 8
-
- def fetchBackpointer(self, oid, back):
- if back == 0:
- return None
- data, tid = self._loadBackTxn(oid, back, 0)
- return data
-
-sys.modules['ZODB.FileStorage.FileStorage'
- ].FileStoragePacker = FileStoragePacker
-ZODB.FileStorage.FileStorage.supportsVersions = lambda self: False
-
-class PackCopier(ZODB.FileStorage.fspack.PackCopier):
-
- def _txn_find(self, tid, stop_at_pack):
- # _pos always points just past the last transaction
- pos = self._pos
- while pos > 4:
- self._file.seek(pos - 8)
- pos = pos - u64(self._file.read(8)) - 8
- self._file.seek(pos)
- h = self._file.read(TRANS_HDR_LEN)
- _tid = h[:8]
- if _tid == tid:
- return pos
- if stop_at_pack:
- if h[16] == 'p':
- break
-
- return None
More information about the Checkins
mailing list