[Checkins] SVN: zc.FileStorage/dev/src/zc/FileStorage/__init__.py Decided to make this a monkey patch to make it easier to reuse tests.

Jim Fulton jim at zope.com
Mon Dec 3 18:47:37 EST 2007


Log message for revision 82113:
  Decided to make this a monkey patch to make it easier to reuse tests.
  
  Made progress toward getting the tests to pass.
  

Changed:
  U   zc.FileStorage/dev/src/zc/FileStorage/__init__.py

-=-
Modified: zc.FileStorage/dev/src/zc/FileStorage/__init__.py
===================================================================
--- zc.FileStorage/dev/src/zc/FileStorage/__init__.py	2007-12-03 19:11:16 UTC (rev 82112)
+++ zc.FileStorage/dev/src/zc/FileStorage/__init__.py	2007-12-03 23:47:37 UTC (rev 82113)
@@ -12,85 +12,20 @@
 #
 ##############################################################################
 
+import sys
+
+from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
+from ZODB.serialize import referencesf
+from ZODB.utils import p64, u64, z64
+
+import BTrees.LLBTree, BTrees.LOBTree
 import ZODB.FileStorage
 import ZODB.FileStorage.fspack
-import BTrees.LLBTree
+import ZODB.fsIndex
 
-class FileStorage(ZODB.FileStorage.FileStorage):
+class FileStoragePacker(FileStorageFormatter):
 
-    def pack(self, t, referencesf):
-        """Copy data from the current database file to a packed file
-
-        Non-current records from transactions with time-stamp strings less
-        than packtss are ommitted. As are all undone records.
-
-        Also, data back pointers that point before packtss are resolved and
-        the associated data are copied, since the old records are not copied.
-        """
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-
-        stop=`TimeStamp(*time.gmtime(t)[:5]+(t%60,))`
-        if stop==z64: raise FileStorageError('Invalid pack time')
-
-        # If the storage is empty, there's nothing to do.
-        if not self._index:
-            return
-
-        self._lock_acquire()
-        try:
-            if self._pack_is_in_progress:
-                raise FileStorageError('Already packing')
-            self._pack_is_in_progress = True
-            current_size = self.getSize()
-        finally:
-            self._lock_release()
-
-        p = FileStoragePacker(self._file_name, stop,
-                              self._lock_acquire, self._lock_release,
-                              self._commit_lock_acquire,
-                              self._commit_lock_release,
-                              current_size, referencesf)
-        try:
-            opos = None
-            try:
-                opos = p.pack()
-            except RedundantPackWarning, detail:
-                logger.info(str(detail))
-            if opos is None:
-                return
-            oldpath = self._file_name + ".old"
-            self._lock_acquire()
-            try:
-                self._file.close()
-                try:
-                    if os.path.exists(oldpath):
-                        os.remove(oldpath)
-                    os.rename(self._file_name, oldpath)
-                except Exception:
-                    self._file = open(self._file_name, 'r+b')
-                    raise
-
-                # OK, we're beyond the point of no return
-                os.rename(self._file_name + '.pack', self._file_name)
-                self._file = open(self._file_name, 'r+b')
-                self._initIndex(p.index, p.vindex, p.tindex, p.tvindex,
-                                p.oid2tid, p.toid2tid,
-                                p.toid2tid_delete)
-                self._pos = opos
-                self._save_index()
-            finally:
-                self._lock_release()
-        finally:
-            if p.locked:
-                self._commit_lock_release()
-            self._lock_acquire()
-            self._pack_is_in_progress = False
-            self._lock_release()
-
-class FileStoragePacker(ZODB.FileStorage.fspack.FileStoragePacker):
-
-    def __init__(self, path, stop, la, lr, cla, clr, current_size, referencesf):
+    def __init__(self, path, stop, la, lr, cla, clr, current_size):
         self._name = path
         # We open our own handle on the storage so that much of pack can
         # proceed in parallel.  It's important to close this file at every
@@ -101,8 +36,6 @@
         # when the storage is disk is doing other io
         self._file = open(path, "rb", 1<<25)
 
-
-
         self._path = path
         self._stop = stop
         self.locked = 0
@@ -116,23 +49,12 @@
         self._commit_lock_acquire = cla
         self._commit_lock_release = clr
 
-        # The packer will use several indexes.
-        # index: oid -> pos
-        # tindex: oid -> pos, for current txn
-        # oid2tid: not used by the packer
+        self.ltid = z64
 
-        self.index = BTrees.fsBTree.fsIndex()
-        self.tindex = {}
-        self.oid2tid = {}
-        self.toid2tid = {}
-        self.toid2tid_delete = {}
-
-        self.referencesf = referencesf
-
     def pack(self):
         packed, index, references, packpos = self.buildPackIndex(
             self._stop, self.file_end)
-        is packed:
+        if packed:
             # nothing to do
             self._file.close()
             return None
@@ -151,42 +73,27 @@
 
         new_pos = self.copyFromPacktime(packpos, self.file_end, output, index)
         
-        self._commit_lock_acquire()
-        self.locked = 1
-        self._lock_acquire()
-        try:
-            # Re-open the file in unbuffered mode.
 
-            # The main thread may write new transactions to the file,
-            # which creates the possibility that we will read a status
-            # 'c' transaction into the pack thread's stdio buffer even
-            # though we're acquiring the commit lock.  Transactions
-            # can still be in progress throughout much of packing, and
-            # are written to the same physical file but via a distinct
-            # Python file object.  The code used to leave off the
-            # trailing 0 argument, and then on every platform except
-            # native Windows it was observed that we could read stale
-            # data from the tail end of the file.
-            self._file.close()  # else self.gc keeps the original alive & open
-            self._file = open(self._path, "rb", 0)
-            self._file.seek(0, 2)
-            self.file_end = self._file.tell()
-        finally:
-            self._lock_release()
-        if ipos < self.file_end:
-            self.copyRest(ipos)
-
         # OK, we've copied everything. Now we need to wrap things up.
-        pos = self._tfile.tell()
-        self._tfile.flush()
-        self._tfile.close()
+        pos = output.tell()
+        output.flush()
+        output.close()
         self._file.close()
 
+        # Grrrrr. The caller wants these attrs
+        self.index = index
+        self.vindex = {}
+        self.tindex = {}
+        self.tvindex = {}
+        self.oid2tid = {}
+        self.toid2tid = {}
+        self.toid2tid_delete = {}
+
         return pos
 
 
     def buildPackIndex(self, stop, file_end):
-        index = BTrees.fsBTree.fsIndex()
+        index = ZODB.fsIndex.fsIndex()
         references = BTrees.LOBTree.LOBTree()
         pos = 4L
         packed = True
@@ -208,9 +115,12 @@
                 if dh.version:
                     self.fail(pos, "Versions are not supported")
                 index[dh.oid] = pos
+                ioid = u64(dh.oid)
                 refs = self._refs(dh)
                 if refs:
-                    references[oid] = refs
+                    references[ioid] = refs
+                else:
+                    references.pop(ioid, None)
                 
                 pos += dh.recordlen()
 
@@ -237,10 +147,14 @@
                 self.checkData(th, tpos, dh, pos)
                 if dh.version:
                     self.fail(pos, "Versions are not supported")
-                refs = self._refs(dh, self.references.get(oid))
+                ioid = u64(dh.oid)
+                refs = self._refs(dh, references.get(ioid))
                 if refs:
-                    references[oid] = refs
+                    references[ioid] = refs
+                else:
+                    references.pop(ioid, None)
                 
+                
                 pos += dh.recordlen()
 
             tlen = self._read_num(pos)
@@ -258,7 +172,7 @@
         if not dh.plen:
             return initial
         
-        refs = self.referencesf(self._file.read(dh.plen))
+        refs = referencesf(self._file.read(dh.plen))
         if not refs:
             return initial
             
@@ -266,26 +180,28 @@
             initial = BTrees.LLBTree.LLSet()
         initial.update(u64(oid) for oid in refs)
         result = BTrees.LLBTree.LLSet()
-        result.__setstate___((tuple(initial),))
+        result.__setstate__((tuple(initial),))
         return result
 
     def gc(self, index, references):
         to_do = [0]
-        reachable = BTrees.fsBTree.fsIndex()
+        reachable = ZODB.fsIndex.fsIndex()
         while to_do:
             ioid = to_do.pop()
             oid = p64(ioid)
             if oid in reachable:
                 continue
-            reachable[oid] = index.pop(oid)
+
+            reachable[oid] = index[oid]
             to_do.extend(references.pop(ioid, ()))
         references.clear()
         return reachable
 
     def copyToPacktime(self, packpos, index, output):
         pos = new_pos = self._metadata_size
+        self._file.seek(0)
         output.write(self._file.read(self._metadata_size))
-        new_index = BTrees.fsBTree.fsIndex()
+        new_index = ZODB.fsIndex.fsIndex()
 
         while pos < packpos:
             th = self._read_txn_header(pos)
@@ -344,91 +260,106 @@
 
         return new_index, new_pos
 
-    def copyFromPacktime(self, pos, file_end, output, index):
-        while pos < file_end:
-            th = self._read_txn_header(pos)
-            new_tpos = output.tell()
-            output.write(th.asString())
-            tend = pos + th.tlen
-            pos += th.headerlen()
-            while pos < tend:
-                h = self._read_data_header(pos)
+    def copyFromPacktime(self, input_pos, file_end, output, index):
+        while input_pos < file_end:
+            input_pos = self._copyNewTrans(input_pos, output, index)
 
-                prev_txn = None
-                if h.plen:
-                    data = self._file.read(h.plen)
-                else:
-                    # If a current record has a backpointer, fetch
-                    # refs and data from the backpointer.  We need
-                    # to write the data in the new record.
-                    data = self.fetchBackpointer(h.oid, h.back)
-                    if h.back:
-                        prev_txn = self.getTxnFromData(h.oid, h.back)
+    def copyRest(self, input_pos, output, index):
+        # Copy data records written since packing started.
 
-                if h.version:
-                    self.fail(pos, "Versions are not supported.")
+        self._commit_lock_acquire()
+        self.locked = 1
+        # Re-open the file in unbuffered mode.
 
-                self._copier.copy(h.oid, h.tid, data, h.version, prev_txn,
-                                  new_tpos, output.tell())
+        # The main thread may write new transactions to the file,
+        # which creates the possibility that we will read a status
+        # 'c' transaction into the pack thread's stdio buffer even
+        # though we're acquiring the commit lock.  Transactions
+        # can still be in progress throughout much of packing, and
+        # are written to the same physical file but via a distinct
+        # Python file object.  The code used to leave off the
+        # trailing 0 argument, and then on every platform except
+        # native Windows it was observed that we could read stale
+        # data from the tail end of the file.
+        self._file.close()
+        self._file = open(self._path, "rb", 0)
 
+        try:
+            while 1:
+                # The call below will raise CorruptedDataError at EOF.
+                input_pos = self._copyNewTrans(
+                    input_pos, output, index,
+                    self._commit_lock_acquire, self._commit_lock_release)
+        except CorruptedDataError, err:
+            # The last call to copyOne() will raise
+            # CorruptedDataError, because it will attempt to read past
+            # the end of the file.  Double-check that the exception
+            # occurred for this reason.
+            self._file.seek(0, 2)
+            endpos = self._file.tell()
+            if endpos != err.pos:
+                raise
 
-                pos += h.recordlen()
-
-            new_pos = output.tell()
-            tlen = p64(new_pos - new_tpos)
-            output.write(tlen)
-            new_pos += 8
-
-            if tlen != h.tlen:
-                # Update the transaction length
-                output.seek(new_tpos + 8)
-                output.write(tlen)
-                output.seek(new_pos)
-
-            pos += 8
-
-        return new_index, new_pos
-
-
-
-    def copyOne(self, ipos):
-        # The call below will raise CorruptedDataError at EOF.
-        th = self._read_txn_header(ipos)
-        self._lock_counter += 1
-        if self._lock_counter % 20 == 0:
-            self._commit_lock_release()
-        pos = self._tfile.tell()
-        self._copier.setTxnPos(pos)
-        self._tfile.write(th.asString())
-        tend = ipos + th.tlen
-        ipos += th.headerlen()
-
-        while ipos < tend:
-            h = self._read_data_header(ipos)
-            ipos += h.recordlen()
+    def _copyNewTrans(self, input_pos, output, index,
+                      acquire=None, release=None):
+        tindex = {}
+        copier = ZODB.FileStorage.fspack.PackCopier(
+            output, index, {}, tindex, {},
+            )
+        th = self._read_txn_header(input_pos)
+        if release is not None:
+            release()
+            
+        output_tpos = output.tell()
+        copier.setTxnPos(output_tpos)
+        output.write(th.asString())
+        tend = input_pos + th.tlen
+        input_pos += th.headerlen()
+        while input_pos < tend:
+            h = self._read_data_header(input_pos)
             prev_txn = None
             if h.plen:
                 data = self._file.read(h.plen)
             else:
+                # If a current record has a backpointer, fetch
+                # refs and data from the backpointer.  We need
+                # to write the data in the new record.
                 data = self.fetchBackpointer(h.oid, h.back)
                 if h.back:
                     prev_txn = self.getTxnFromData(h.oid, h.back)
 
             if h.version:
-                self.fail(ipos, "Versions are not supported.")
+                self.fail(pos, "Versions are not supported.")
 
-            self._copier.copy(h.oid, h.tid, data, prev_txn, pos,
-                              self._tfile.tell())
+            copier.copy(h.oid, h.tid, data, '', prev_txn,
+                        output_tpos, output.tell())
 
-        tlen = self._tfile.tell() - pos
-        assert tlen == th.tlen
-        self._tfile.write(p64(tlen))
-        ipos += 8
+            input_pos += h.recordlen()
 
-        self.index.update(self.tindex)
-        self.tindex.clear()
-        if self._lock_counter % 20 == 0:
-            self._commit_lock_acquire()
-        return ipos
+        output_pos = output.tell()
+        tlen = p64(output_pos - output_tpos)
+        output.write(tlen)
+        output_pos += 8
 
+        if tlen != th.tlen:
+            # Update the transaction length
+            output.seek(output_tpos + 8)
+            output.write(tlen)
+            output.seek(output_pos)
 
+        index.update(tindex)
+        tindex.clear()
+
+        if acquire is not None:
+            acquire()
+
+        return input_pos + 8
+
+    def fetchBackpointer(self, oid, back):
+        if back == 0:
+            return None
+        data, tid = self._loadBackTxn(oid, back, 0)
+        return data
+
+sys.modules['ZODB.FileStorage.FileStorage'
+            ].FileStoragePacker = FileStoragePacker



More information about the Checkins mailing list