[Checkins] SVN: ZODB/trunk/src/ The FileStorage iterator now handles large files better. Whenm

Jim Fulton jim at zope.com
Mon Dec 22 17:48:31 EST 2008


Log message for revision 94254:
  The FileStorage iterator now handles large files better.  Whenm
  iteratng from a starting transaction near the end of the file, the
  iterator will scan backward from the end of the file to find the
  starting point.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
  A   ZODB/trunk/src/ZODB/FileStorage/iterator.test
  U   ZODB/trunk/src/ZODB/FileStorage/tests.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2008-12-22 21:51:07 UTC (rev 94253)
+++ ZODB/trunk/src/CHANGES.txt	2008-12-22 22:48:30 UTC (rev 94254)
@@ -42,6 +42,11 @@
 - As a small convenience (mainly for tests), you can now specify
   initial data as a string argument to the Blob constructor.
 
+- The FileStorage iterator now handles large files better.  Whenm
+  iteratng from a starting transaction near the end of the file, the
+  iterator will scan backward from the end of the file to find the
+  starting point.
+
 3.9.0a8 (2008-12-15)
 ====================
 
@@ -54,7 +59,7 @@
      blob-cache-size * (100 - blob-cache-size-check) / 100
 
   The makes it far more likely (but doesn't guarantee) that the blob
-  cache size will remain under the maximum.  
+  cache size will remain under the maximum.
 
   The blob-cache-size check was reduced to 10%.
 
@@ -79,7 +84,7 @@
   cache will periodically be reduced to the target size.
 
   The client blob directory layout has changed.  If you have existing
-  non-shared blob directories, you will have to remove them. 
+  non-shared blob directories, you will have to remove them.
 
 Bugs Fixed
 ----------
@@ -119,7 +124,7 @@
   you would otherwise pass to ZEO.ClientStorage.ClientStorage::
 
     import ZEO
-    db = ZEO.DB(('some_host', 8200)) 
+    db = ZEO.DB(('some_host', 8200))
 
 - Object saves are a little faster
 
@@ -297,7 +302,7 @@
 - Fixed bug 153316: persistent and BTrees were using `int`
   for memory sizes which caused errors on x86_64 Intel Xeon machines
   (using 64-bit Linux).
-  
+
 - Fixed small bug that the Connection.isReadOnly method didn't
   work after a savepoint.
 
@@ -390,7 +395,7 @@
 Bugs Fixed:
 
 - Fixed several bugs that caused ZEO cache corruption when connecting
-  to servers. These bugs affected both persistent and non-persistent caches. 
+  to servers. These bugs affected both persistent and non-persistent caches.
 
 - Improved the the ZEO client shutdown support to try to
   avoid spurious errors on exit, especially for scripts, such as zeopack.
@@ -416,7 +421,7 @@
 Bugs Fixed:
 
 - The cache used an excessive amount of memory, causing applications
-  with large caches to exhaust available memory. 
+  with large caches to exhaust available memory.
 
 3.8.1b1 (2008-05-08)
 ====================

Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2008-12-22 21:51:07 UTC (rev 94253)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2008-12-22 22:48:30 UTC (rev 94254)
@@ -21,7 +21,7 @@
 from struct import pack, unpack
 from types import StringType
 from zc.lockfile import LockFile
-from ZODB.FileStorage.format import CorruptedDataError
+from ZODB.FileStorage.format import CorruptedError, CorruptedDataError
 from ZODB.FileStorage.format import FileStorageFormatter, DataHeader
 from ZODB.FileStorage.format import TRANS_HDR, TRANS_HDR_LEN
 from ZODB.FileStorage.format import TxnHeader, DATA_HDR, DATA_HDR_LEN
@@ -50,7 +50,6 @@
 
 logger = logging.getLogger('ZODB.FileStorage')
 
-
 def panic(message, *data):
     logger.critical(message, *data)
     raise CorruptedTransactionError(message)
@@ -210,7 +209,7 @@
             self.blob_dir = os.path.abspath(blob_dir)
             if create and os.path.exists(self.blob_dir):
                 ZODB.blob.remove_committed_dir(self.blob_dir)
-                
+
             self._blob_init(blob_dir)
             zope.interface.alsoProvides(self,
                                         ZODB.interfaces.IBlobStorageRestoreable)
@@ -484,7 +483,7 @@
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
         assert not version
-        
+
         self._lock_acquire()
         try:
             if oid > self._oid:
@@ -532,7 +531,7 @@
             raise POSException.ReadOnlyError()
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
-        
+
         self._lock_acquire()
         try:
             old = self._index_get(oid, 0)
@@ -544,7 +543,7 @@
             if oldserial != committed_tid:
                 raise POSException.ConflictError(
                     oid=oid, serials=(committed_tid, oldserial))
-                    
+
             pos = self._pos
             here = pos + self._tfile.tell() + self._thl
             self._tindex[oid] = here
@@ -748,7 +747,7 @@
     def _finish_finish(self, tid):
         # This is a separate method to allow tests to replace it with
         # something broken. :)
-        
+
         self._file.flush()
         if fsync is not None:
             fsync(self._file.fileno())
@@ -825,7 +824,7 @@
             # Eek, a later transaction modified the data, but,
             # maybe it is pointing at the same data we are.
             ctid, cdataptr, cdata = self._undoDataInfo(oid, ipos, tpos)
-            
+
             if cdataptr != pos:
                 # We aren't sure if we are talking about the same data
                 try:
@@ -994,7 +993,7 @@
                             self.openCommittedBlobFile(h.oid, userial),
                             open(tmp, 'wb'))
                         self._blob_storeblob(h.oid, self._tid, tmp)
-                
+
                 new = DataHeader(h.oid, self._tid, ipos, otloc, 0, len(p))
 
                 # TODO:  This seek shouldn't be necessary, but some other
@@ -1177,7 +1176,7 @@
             # Helpers that remove an oid dir or revision file.
             handle_file = ZODB.blob.remove_committed
             handle_dir = ZODB.blob.remove_committed_dir
-            
+
         # Fist step: move or remove oids or revisions
         for line in open(os.path.join(self.blob_dir, '.removed')):
             line = line.strip().decode('hex')
@@ -1191,10 +1190,10 @@
                 handle_dir(path)
                 maybe_remove_empty_dir_containing(path)
                 continue
-            
+
             if len(line) != 16:
                 raise ValueError("Bad record in ", self.blob_dir, '.removed')
-            
+
             oid, tid = line[:8], line[8:]
             path = fshelper.getBlobFilename(oid, tid)
             if not os.path.exists(path):
@@ -1208,7 +1207,7 @@
 
         if not self.pack_keep_old:
             return
-            
+
         # Second step, copy remaining files.
         for path, dir_names, file_names in os.walk(self.blob_dir):
             for file_name in file_names:
@@ -1219,7 +1218,7 @@
                 if not os.path.exists(dest):
                     os.makedirs(dest, 0700)
                 link_or_copy(file_path, old+file_path[lblob_dir:])
-        
+
     def iterator(self, start=None, stop=None):
         return FileIterator(self._file_name, start, stop)
 
@@ -1244,8 +1243,8 @@
                     for trans in FileIterator(self._file_name, pos=pos)]
         finally:
             self._lock_release()
-        
 
+
     def lastTid(self, oid):
         """Return last serialno committed for object oid.
 
@@ -1641,16 +1640,23 @@
         assert isinstance(filename, str)
         file = open(filename, 'rb')
         self._file = file
+        self._file_name = filename
         if file.read(4) != packed_version:
             raise FileStorageFormatError(file.name)
         file.seek(0,2)
         self._file_size = file.tell()
+        if (pos < 4) or pos > self._file_size:
+            raise ValueError("Given position is greater than the file size",
+                             pos, self._file_size)
         self._pos = pos
         assert start is None or isinstance(start, str)
         assert stop is None or isinstance(stop, str)
+        self._start = start
+        self._stop = stop
         if start:
+            if self._file_size <= 4:
+                return
             self._skip_to_start(start)
-        self._stop = stop
 
     def __len__(self):
         # Define a bogus __len__() to make the iterator work
@@ -1674,32 +1680,87 @@
             file.close()
 
     def _skip_to_start(self, start):
-        # Scan through the transaction records doing almost no sanity
-        # checks.
         file = self._file
+        pos1 = self._pos
+        file.seek(pos1)
+        tid1 = file.read(8)
+        if len(tid1) < 8:
+            raise CorruptedError("Couldn't read tid.")
+        if start < tid1:
+            pos2 = pos1
+            tid2 = tid1
+            file.seek(4)
+            tid1 = file.read(8)
+            if start <= tid1:
+                self._pos = 4
+                return
+            pos1 = 4
+        else:
+            if start == tid1:
+                return
+
+            # Try to read the last transaction. We could be unlucky and
+            # opened the file while committing a transaction.  In that
+            # case, we'll just scan from the beginning if the file is
+            # small enough, otherwise we'll fail.
+            file.seek(self._file_size-8)
+            l = u64(file.read(8))
+            if not (l + 12 <= self._file_size and
+                    self._read_num(self._file_size-l) == l):
+                if self._file_size < (1<<20):
+                    return self._scan_foreward(start)
+                raise ValueError("Can't find last transaction in large file")
+            pos2 = self._file_size-l-8
+            file.seek(pos2)
+            tid2 = file.read(8)
+            if tid2 < tid1:
+                raise CorruptedError("Tids out of order.")
+            if tid2 <= start:
+                if tid2 == start:
+                    self._pos = pos2
+                else:
+                    self._pos = self._file_size
+                return
+
+        t1 = ZODB.TimeStamp.TimeStamp(tid1).timeTime()
+        t2 = ZODB.TimeStamp.TimeStamp(tid2).timeTime()
+        ts = ZODB.TimeStamp.TimeStamp(start).timeTime()
+        if (ts - t1) < (t2 - ts):
+            return self._scan_forward(pos1, start)
+        else:
+            return self._scan_backward(pos2, start)
+
+    def _scan_forward(self, pos, start):
+        logger.debug("Scan forward %s:%s looking for %r",
+                     self._file_name, pos, start)
+        file = self._file
+        while 1:
+            # Read the transaction record
+            h = self._read_txn_header(pos)
+            if h.tid >= start:
+                self._pos = pos
+                return
+
+            pos += h.tlen + 8
+
+    def _scan_backward(self, pos, start):
+        logger.debug("Scan backward %s:%s looking for %r",
+                     self._file_name, pos, start)
+        file = self._file
+        seek = file.seek
         read = file.read
-        seek = file.seek
         while 1:
-            seek(self._pos)
-            h = read(16)
-            if len(h) < 16:
+            pos -= 8
+            seek(pos)
+            tlen = ZODB.utils.u64(read(8))
+            pos -= tlen
+            h = self._read_txn_header(pos)
+            if h.tid <= start:
+                if h.tid == start:
+                    self._pos = pos
+                else:
+                    self._pos = pos + tlen + 8
                 return
-            tid, stl = unpack(">8s8s", h)
-            if tid >= start:
-                return
-            tl = u64(stl)
-            try:
-                self._pos += tl + 8
-            except OverflowError:
-                self._pos = long(self._pos) + tl + 8
-            if __debug__:
-                # Sanity check
-                seek(self._pos - 8, 0)
-                rtl = read(8)
-                if rtl != stl:
-                    pos = file.tell() - 8
-                    panic("%s has inconsistent transaction length at %s "
-                          "(%s != %s)", file.name, pos, u64(rtl), u64(stl))
 
     # Iterator protocol
     def __iter__(self):

Added: ZODB/trunk/src/ZODB/FileStorage/iterator.test
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/iterator.test	                        (rev 0)
+++ ZODB/trunk/src/ZODB/FileStorage/iterator.test	2008-12-22 22:48:30 UTC (rev 94254)
@@ -0,0 +1,142 @@
+FileStorage-specific iterator tests
+===================================
+
+The FileStorage iterator has some special features that deserve some
+special tests.
+
+We'll make some assertions about time, so we'll take it over:
+
+    >>> now = 1229959248
+    >>> def faux_time():
+    ...     global now
+    ...     now += 0.1
+    ...     return now
+    >>> import time
+    >>> time_time = time.time
+    >>> time.time = faux_time
+
+Commit a bunch of transactions:
+
+    >>> import ZODB.FileStorage, transaction
+    >>> db = ZODB.DB('data.fs')
+    >>> tids = [db.storage.lastTransaction()]
+    >>> poss = [db.storage._pos]
+    >>> conn = db.open()
+    >>> for i in range(100):
+    ...     conn.root()[i] = conn.root().__class__()
+    ...     transaction.commit()
+    ...     tids.append(db.storage.lastTransaction())
+    ...     poss.append(db.storage._pos)
+
+Deciding where to start
+-----------------------
+
+By default, we start at the beginning:
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs')
+    >>> it.next().tid == tids[0]
+    True
+
+The file iterator has an optimization to deal with large files.  It
+can serarch from either the front or the back of the file, depending
+on the starting transaction given.  To see this, we'll turn on debug
+logging:
+
+    >>> import logging, sys
+    >>> old_log_level = logging.getLogger().getEffectiveLevel()
+    >>> logging.getLogger().setLevel(logging.DEBUG)
+    >>> handler = logging.StreamHandler(sys.stdout)
+    >>> logging.getLogger().addHandler(handler)
+
+If we specify a start transaction, we'll scan forward or backward, as
+seems best and set the next record to that:
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[0])
+    >>> it.next().tid == tids[0]
+    True
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[1])
+    Scan forward data.fs:4 looking for '\x03z\xbd\xd8\xd06\x9c\xcc'
+    >>> it.next().tid == tids[1]
+    True
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[30])
+    Scan forward data.fs:4 looking for '\x03z\xbd\xd8\xdc\x96.\xcc'
+    >>> it.next().tid == tids[30]
+    True
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[70])
+    Scan backward data.fs:118274 looking for '\x03z\xbd\xd8\xed\xa7>\xcc'
+    >>> it.next().tid == tids[70]
+    True
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[-2])
+    Scan backward data.fs:118274 looking for '\x03z\xbd\xd8\xfa\x06\xd0\xcc'
+    >>> it.next().tid == tids[-2]
+    True
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[-1])
+    >>> it.next().tid == tids[-1]
+    True
+
+We can also supply a file position.  This can speed up finding the
+starting point, or just pick up where another iterator left off:
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', pos=poss[50])
+    >>> it.next().tid == tids[51]
+    True
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[0], pos=4)
+    >>> it.next().tid == tids[0]
+    True
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[-1], pos=poss[-2])
+    >>> it.next().tid == tids[-1]
+    True
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[50], pos=poss[50])
+    Scan backward data.fs:36542 looking for '\x03z\xbd\xd8\xe5\x1e\xb6\xcc'
+    >>> it.next().tid == tids[50]
+    True
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[49], pos=poss[50])
+    Scan backward data.fs:36542 looking for '\x03z\xbd\xd8\xe4\xb1|\xcc'
+    >>> it.next().tid == tids[49]
+    True
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', tids[51], pos=poss[50])
+    >>> it.next().tid == tids[51]
+    True
+
+    >>> logging.getLogger().setLevel(old_log_level)
+    >>> logging.getLogger().removeHandler(handler)
+
+
+If a starting transaction is before the first transaction in the file,
+then the first transaction is returned.
+
+    >>> from ZODB.utils import p64, u64
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', p64(u64(tids[0])-1))
+    >>> it.next().tid == tids[0]
+    True
+
+If it is after the last transaction, then iteration be empty:
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', p64(u64(tids[-1])+1))
+    >>> list(it)
+    []
+
+Even if we write more transactions:
+
+    >>> it = ZODB.FileStorage.FileIterator('data.fs', p64(u64(tids[-1])+1))
+    >>> for i in range(10):
+    ...     conn.root()[i] = conn.root().__class__()
+    ...     transaction.commit()
+    >>> list(it)
+    []
+
+.. Cleanup
+
+    >>> time.time = time_time
+    >>> it.close()
+    >>> db.close()


Property changes on: ZODB/trunk/src/ZODB/FileStorage/iterator.test
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: ZODB/trunk/src/ZODB/FileStorage/tests.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/tests.py	2008-12-22 21:51:07 UTC (rev 94253)
+++ ZODB/trunk/src/ZODB/FileStorage/tests.py	2008-12-22 22:48:30 UTC (rev 94254)
@@ -93,7 +93,7 @@
 def test_suite():
     return unittest.TestSuite((
         doctest.DocFileSuite(
-            'zconfig.txt',
+            'zconfig.txt', 'iterator.test',
             setUp=ZODB.tests.util.setUp, tearDown=ZODB.tests.util.tearDown,
             ),
         doctest.DocTestSuite(



More information about the Checkins mailing list