[Checkins] SVN: zc.FileStorage/dev/src/zc/FileStorage/__init__.py
Decided to make this a monkey patch to make it easier to
reuse tests.
Jim Fulton
jim at zope.com
Mon Dec 3 18:47:37 EST 2007
Log message for revision 82113:
Decided to make this a monkey patch to make it easier to reuse tests.
Made progress toward getting the tests to pass.
Changed:
U zc.FileStorage/dev/src/zc/FileStorage/__init__.py
-=-
Modified: zc.FileStorage/dev/src/zc/FileStorage/__init__.py
===================================================================
--- zc.FileStorage/dev/src/zc/FileStorage/__init__.py 2007-12-03 19:11:16 UTC (rev 82112)
+++ zc.FileStorage/dev/src/zc/FileStorage/__init__.py 2007-12-03 23:47:37 UTC (rev 82113)
@@ -12,85 +12,20 @@
#
##############################################################################
+import sys
+
+from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
+from ZODB.serialize import referencesf
+from ZODB.utils import p64, u64, z64
+
+import BTrees.LLBTree, BTrees.LOBTree
import ZODB.FileStorage
import ZODB.FileStorage.fspack
-import BTrees.LLBTree
+import ZODB.fsIndex
-class FileStorage(ZODB.FileStorage.FileStorage):
+class FileStoragePacker(FileStorageFormatter):
- def pack(self, t, referencesf):
- """Copy data from the current database file to a packed file
-
- Non-current records from transactions with time-stamp strings less
- than packtss are ommitted. As are all undone records.
-
- Also, data back pointers that point before packtss are resolved and
- the associated data are copied, since the old records are not copied.
- """
- if self._is_read_only:
- raise POSException.ReadOnlyError()
-
- stop=`TimeStamp(*time.gmtime(t)[:5]+(t%60,))`
- if stop==z64: raise FileStorageError('Invalid pack time')
-
- # If the storage is empty, there's nothing to do.
- if not self._index:
- return
-
- self._lock_acquire()
- try:
- if self._pack_is_in_progress:
- raise FileStorageError('Already packing')
- self._pack_is_in_progress = True
- current_size = self.getSize()
- finally:
- self._lock_release()
-
- p = FileStoragePacker(self._file_name, stop,
- self._lock_acquire, self._lock_release,
- self._commit_lock_acquire,
- self._commit_lock_release,
- current_size, referencesf)
- try:
- opos = None
- try:
- opos = p.pack()
- except RedundantPackWarning, detail:
- logger.info(str(detail))
- if opos is None:
- return
- oldpath = self._file_name + ".old"
- self._lock_acquire()
- try:
- self._file.close()
- try:
- if os.path.exists(oldpath):
- os.remove(oldpath)
- os.rename(self._file_name, oldpath)
- except Exception:
- self._file = open(self._file_name, 'r+b')
- raise
-
- # OK, we're beyond the point of no return
- os.rename(self._file_name + '.pack', self._file_name)
- self._file = open(self._file_name, 'r+b')
- self._initIndex(p.index, p.vindex, p.tindex, p.tvindex,
- p.oid2tid, p.toid2tid,
- p.toid2tid_delete)
- self._pos = opos
- self._save_index()
- finally:
- self._lock_release()
- finally:
- if p.locked:
- self._commit_lock_release()
- self._lock_acquire()
- self._pack_is_in_progress = False
- self._lock_release()
-
-class FileStoragePacker(ZODB.FileStorage.fspack.FileStoragePacker):
-
- def __init__(self, path, stop, la, lr, cla, clr, current_size, referencesf):
+ def __init__(self, path, stop, la, lr, cla, clr, current_size):
self._name = path
# We open our own handle on the storage so that much of pack can
# proceed in parallel. It's important to close this file at every
@@ -101,8 +36,6 @@
# when the storage is disk is doing other io
self._file = open(path, "rb", 1<<25)
-
-
self._path = path
self._stop = stop
self.locked = 0
@@ -116,23 +49,12 @@
self._commit_lock_acquire = cla
self._commit_lock_release = clr
- # The packer will use several indexes.
- # index: oid -> pos
- # tindex: oid -> pos, for current txn
- # oid2tid: not used by the packer
+ self.ltid = z64
- self.index = BTrees.fsBTree.fsIndex()
- self.tindex = {}
- self.oid2tid = {}
- self.toid2tid = {}
- self.toid2tid_delete = {}
-
- self.referencesf = referencesf
-
def pack(self):
packed, index, references, packpos = self.buildPackIndex(
self._stop, self.file_end)
- is packed:
+ if packed:
# nothing to do
self._file.close()
return None
@@ -151,42 +73,27 @@
new_pos = self.copyFromPacktime(packpos, self.file_end, output, index)
- self._commit_lock_acquire()
- self.locked = 1
- self._lock_acquire()
- try:
- # Re-open the file in unbuffered mode.
- # The main thread may write new transactions to the file,
- # which creates the possibility that we will read a status
- # 'c' transaction into the pack thread's stdio buffer even
- # though we're acquiring the commit lock. Transactions
- # can still be in progress throughout much of packing, and
- # are written to the same physical file but via a distinct
- # Python file object. The code used to leave off the
- # trailing 0 argument, and then on every platform except
- # native Windows it was observed that we could read stale
- # data from the tail end of the file.
- self._file.close() # else self.gc keeps the original alive & open
- self._file = open(self._path, "rb", 0)
- self._file.seek(0, 2)
- self.file_end = self._file.tell()
- finally:
- self._lock_release()
- if ipos < self.file_end:
- self.copyRest(ipos)
-
# OK, we've copied everything. Now we need to wrap things up.
- pos = self._tfile.tell()
- self._tfile.flush()
- self._tfile.close()
+ pos = output.tell()
+ output.flush()
+ output.close()
self._file.close()
+ # Grrrrr. The caller wants these attrs
+ self.index = index
+ self.vindex = {}
+ self.tindex = {}
+ self.tvindex = {}
+ self.oid2tid = {}
+ self.toid2tid = {}
+ self.toid2tid_delete = {}
+
return pos
def buildPackIndex(self, stop, file_end):
- index = BTrees.fsBTree.fsIndex()
+ index = ZODB.fsIndex.fsIndex()
references = BTrees.LOBTree.LOBTree()
pos = 4L
packed = True
@@ -208,9 +115,12 @@
if dh.version:
self.fail(pos, "Versions are not supported")
index[dh.oid] = pos
+ ioid = u64(dh.oid)
refs = self._refs(dh)
if refs:
- references[oid] = refs
+ references[ioid] = refs
+ else:
+ references.pop(ioid, None)
pos += dh.recordlen()
@@ -237,10 +147,14 @@
self.checkData(th, tpos, dh, pos)
if dh.version:
self.fail(pos, "Versions are not supported")
- refs = self._refs(dh, self.references.get(oid))
+ ioid = u64(dh.oid)
+ refs = self._refs(dh, references.get(ioid))
if refs:
- references[oid] = refs
+ references[ioid] = refs
+ else:
+ references.pop(ioid, None)
+
pos += dh.recordlen()
tlen = self._read_num(pos)
@@ -258,7 +172,7 @@
if not dh.plen:
return initial
- refs = self.referencesf(self._file.read(dh.plen))
+ refs = referencesf(self._file.read(dh.plen))
if not refs:
return initial
@@ -266,26 +180,28 @@
initial = BTrees.LLBTree.LLSet()
initial.update(u64(oid) for oid in refs)
result = BTrees.LLBTree.LLSet()
- result.__setstate___((tuple(initial),))
+ result.__setstate__((tuple(initial),))
return result
def gc(self, index, references):
to_do = [0]
- reachable = BTrees.fsBTree.fsIndex()
+ reachable = ZODB.fsIndex.fsIndex()
while to_do:
ioid = to_do.pop()
oid = p64(ioid)
if oid in reachable:
continue
- reachable[oid] = index.pop(oid)
+
+ reachable[oid] = index[oid]
to_do.extend(references.pop(ioid, ()))
references.clear()
return reachable
def copyToPacktime(self, packpos, index, output):
pos = new_pos = self._metadata_size
+ self._file.seek(0)
output.write(self._file.read(self._metadata_size))
- new_index = BTrees.fsBTree.fsIndex()
+ new_index = ZODB.fsIndex.fsIndex()
while pos < packpos:
th = self._read_txn_header(pos)
@@ -344,91 +260,106 @@
return new_index, new_pos
- def copyFromPacktime(self, pos, file_end, output, index):
- while pos < file_end:
- th = self._read_txn_header(pos)
- new_tpos = output.tell()
- output.write(th.asString())
- tend = pos + th.tlen
- pos += th.headerlen()
- while pos < tend:
- h = self._read_data_header(pos)
+ def copyFromPacktime(self, input_pos, file_end, output, index):
+ while input_pos < file_end:
+ input_pos = self._copyNewTrans(input_pos, output, index)
- prev_txn = None
- if h.plen:
- data = self._file.read(h.plen)
- else:
- # If a current record has a backpointer, fetch
- # refs and data from the backpointer. We need
- # to write the data in the new record.
- data = self.fetchBackpointer(h.oid, h.back)
- if h.back:
- prev_txn = self.getTxnFromData(h.oid, h.back)
+ def copyRest(self, input_pos, output, index):
+ # Copy data records written since packing started.
- if h.version:
- self.fail(pos, "Versions are not supported.")
+ self._commit_lock_acquire()
+ self.locked = 1
+ # Re-open the file in unbuffered mode.
- self._copier.copy(h.oid, h.tid, data, h.version, prev_txn,
- new_tpos, output.tell())
+ # The main thread may write new transactions to the file,
+ # which creates the possibility that we will read a status
+ # 'c' transaction into the pack thread's stdio buffer even
+ # though we're acquiring the commit lock. Transactions
+ # can still be in progress throughout much of packing, and
+ # are written to the same physical file but via a distinct
+ # Python file object. The code used to leave off the
+ # trailing 0 argument, and then on every platform except
+ # native Windows it was observed that we could read stale
+ # data from the tail end of the file.
+ self._file.close()
+ self._file = open(self._path, "rb", 0)
+ try:
+ while 1:
+ # The call below will raise CorruptedDataError at EOF.
+ input_pos = self._copyNewTrans(
+ input_pos, output, index,
+ self._commit_lock_acquire, self._commit_lock_release)
+ except CorruptedDataError, err:
+ # The last call to copyOne() will raise
+ # CorruptedDataError, because it will attempt to read past
+ # the end of the file. Double-check that the exception
+ # occurred for this reason.
+ self._file.seek(0, 2)
+ endpos = self._file.tell()
+ if endpos != err.pos:
+ raise
- pos += h.recordlen()
-
- new_pos = output.tell()
- tlen = p64(new_pos - new_tpos)
- output.write(tlen)
- new_pos += 8
-
- if tlen != h.tlen:
- # Update the transaction length
- output.seek(new_tpos + 8)
- output.write(tlen)
- output.seek(new_pos)
-
- pos += 8
-
- return new_index, new_pos
-
-
-
- def copyOne(self, ipos):
- # The call below will raise CorruptedDataError at EOF.
- th = self._read_txn_header(ipos)
- self._lock_counter += 1
- if self._lock_counter % 20 == 0:
- self._commit_lock_release()
- pos = self._tfile.tell()
- self._copier.setTxnPos(pos)
- self._tfile.write(th.asString())
- tend = ipos + th.tlen
- ipos += th.headerlen()
-
- while ipos < tend:
- h = self._read_data_header(ipos)
- ipos += h.recordlen()
+ def _copyNewTrans(self, input_pos, output, index,
+ acquire=None, release=None):
+ tindex = {}
+ copier = ZODB.FileStorage.fspack.PackCopier(
+ output, index, {}, tindex, {},
+ )
+ th = self._read_txn_header(input_pos)
+ if release is not None:
+ release()
+
+ output_tpos = output.tell()
+ copier.setTxnPos(output_tpos)
+ output.write(th.asString())
+ tend = input_pos + th.tlen
+ input_pos += th.headerlen()
+ while input_pos < tend:
+ h = self._read_data_header(input_pos)
prev_txn = None
if h.plen:
data = self._file.read(h.plen)
else:
+ # If a current record has a backpointer, fetch
+ # refs and data from the backpointer. We need
+ # to write the data in the new record.
data = self.fetchBackpointer(h.oid, h.back)
if h.back:
prev_txn = self.getTxnFromData(h.oid, h.back)
if h.version:
- self.fail(ipos, "Versions are not supported.")
+ self.fail(pos, "Versions are not supported.")
- self._copier.copy(h.oid, h.tid, data, prev_txn, pos,
- self._tfile.tell())
+ copier.copy(h.oid, h.tid, data, '', prev_txn,
+ output_tpos, output.tell())
- tlen = self._tfile.tell() - pos
- assert tlen == th.tlen
- self._tfile.write(p64(tlen))
- ipos += 8
+ input_pos += h.recordlen()
- self.index.update(self.tindex)
- self.tindex.clear()
- if self._lock_counter % 20 == 0:
- self._commit_lock_acquire()
- return ipos
+ output_pos = output.tell()
+ tlen = p64(output_pos - output_tpos)
+ output.write(tlen)
+ output_pos += 8
+ if tlen != th.tlen:
+ # Update the transaction length
+ output.seek(output_tpos + 8)
+ output.write(tlen)
+ output.seek(output_pos)
+ index.update(tindex)
+ tindex.clear()
+
+ if acquire is not None:
+ acquire()
+
+ return input_pos + 8
+
+ def fetchBackpointer(self, oid, back):
+ if back == 0:
+ return None
+ data, tid = self._loadBackTxn(oid, back, 0)
+ return data
+
+sys.modules['ZODB.FileStorage.FileStorage'
+ ].FileStoragePacker = FileStoragePacker
More information about the Checkins
mailing list