[Zodb-checkins] CVS: ZODB3/ZODB - fspack.py:1.8.6.4 FileStorage.py:1.135.4.2

Jeremy Hylton jeremy at zope.com
Tue Sep 9 00:09:20 EDT 2003


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv12358/ZODB

Modified Files:
      Tag: ZODB3-3_2-branch
	fspack.py FileStorage.py 
Log Message:
Port the oid cache from ZODB 3.1.


=== ZODB3/ZODB/fspack.py 1.8.6.3 => 1.8.6.4 ===
--- ZODB3/ZODB/fspack.py:1.8.6.3	Mon Sep  8 22:40:35 2003
+++ ZODB3/ZODB/fspack.py	Mon Sep  8 23:09:18 2003
@@ -647,11 +647,15 @@
         # vindex: version -> pos of XXX
         # tindex: oid -> pos, for current txn
         # tvindex: version -> pos of XXX, for current txn
+        # oid2serial: not used by the packer
         
         self.index = fsIndex()
         self.vindex = {}
         self.tindex = {}
         self.tvindex = {}
+        self.oid2serial = {}
+        self.toid2serial = {}
+        self.toid2serial_delete = {}
 
         # Index for non-version data.  This is a temporary structure
         # to reduce I/O during packing


=== ZODB3/ZODB/FileStorage.py 1.135.4.1 => 1.135.4.2 ===
--- ZODB3/ZODB/FileStorage.py:1.135.4.1	Wed Sep  3 16:06:02 2003
+++ ZODB3/ZODB/FileStorage.py	Mon Sep  8 23:09:18 2003
@@ -157,17 +157,21 @@
 assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
 assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
 
+def blather(message, *data):
+    LOG('ZODB FS', BLATHER, "%s blather: %s\n" % (packed_version,
+                                                  message % data))
+
 def warn(message, *data):
     LOG('ZODB FS', WARNING, "%s  warn: %s\n" % (packed_version,
-                                                (message % data)))
+                                                message % data))
 
 def error(message, *data):
     LOG('ZODB FS', ERROR, "%s ERROR: %s\n" % (packed_version,
-                                              (message % data)))
+                                              message % data))
 
 def nearPanic(message, *data):
     LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version,
-                                              (message % data)))
+                                              message % data))
 
 def panic(message, *data):
     message = message % data
@@ -234,8 +238,10 @@
 
         BaseStorage.BaseStorage.__init__(self, file_name)
 
-        index, vindex, tindex, tvindex = self._newIndexes()
-        self._initIndex(index, vindex, tindex, tvindex)
+        (index, vindex, tindex, tvindex,
+         oid2serial, toid2serial, toid2serial_delete) = self._newIndexes()
+        self._initIndex(index, vindex, tindex, tvindex,
+                        oid2serial, toid2serial, toid2serial_delete)
 
         # Now open the file
 
@@ -269,7 +275,8 @@
             self._used_index = 1 # Marker for testing
             index, vindex, start, maxoid, ltid = r
 
-            self._initIndex(index, vindex, tindex, tvindex)
+            self._initIndex(index, vindex, tindex, tvindex,
+                            oid2serial, toid2serial, toid2serial_delete)
             self._pos, self._oid, tid = read_index(
                 self._file, file_name, index, vindex, tindex, stop,
                 ltid=ltid, start=start, maxoid=maxoid,
@@ -302,7 +309,11 @@
 
         self._quota = quota
 
-    def _initIndex(self, index, vindex, tindex, tvindex):
+        # Serialno cache statistics.
+        self._oid2serial_nlookups = self._oid2serial_nhits = 0
+
+    def _initIndex(self, index, vindex, tindex, tvindex,
+                   oid2serial, toid2serial, toid2serial_delete):
         self._index=index
         self._vindex=vindex
         self._tindex=tindex
@@ -310,12 +321,33 @@
         self._index_get=index.get
         self._vindex_get=vindex.get
 
+        # .store() needs to compare the passed-in serial to the current
+        # serial in the database.  _oid2serial caches the oid -> current
+        # serial mapping for non-version data (if the current record for
+        # oid is version data, the oid is not a key in _oid2serial).
+        # The point is that otherwise seeking into the storage is needed
+        # to extract the current serial, and that's an expensive operation.
+        # For example, if a transaction stores 4000 objects, and each
+        # random seek + read takes 7ms (that was approximately true on
+        # Linux and Windows tests in mid-2003), that's 28 seconds just to
+        # find the old serials.
+        # XXX Probably better to junk this and redefine _index as mapping
+        # XXX oid to (offset, serialno) pair, via a new memory-efficient
+        # XXX BTree type.
+        self._oid2serial = oid2serial
+        # oid->serialno map to transactionally add to _oid2serial.
+        self._toid2serial = toid2serial
+        # Set of oids to transactionally delete from _oid2serial (e.g.,
+        # oids reverted by undo, or for which the most recent record
+        # becomes version data).
+        self._toid2serial_delete = toid2serial_delete
+
     def __len__(self):
         return len(self._index)
 
     def _newIndexes(self):
         # hook to use something other than builtin dict
-        return fsIndex(), {}, {}, {}
+        return fsIndex(), {}, {}, {}, {}, {}, {}
 
     _saved = 0
     def _save_index(self):
@@ -483,6 +515,31 @@
             # XXX should log the error, though
             pass # We don't care if this fails.
 
+    # Return serial number of most recent record for oid if that's in
+    # the _oid2serial cache.  Else return None.  It's important to use
+    # this instead of indexing _oid2serial directly so that cache
+    # statistics can be logged.
+    def _get_cached_serial(self, oid):
+        self._oid2serial_nlookups += 1
+        result = self._oid2serial.get(oid)
+        if result is not None:
+            self._oid2serial_nhits += 1
+
+        # Log a msg every ~8000 tries, and prevent overflow.
+        if self._oid2serial_nlookups & 0x1fff == 0:
+            if self._oid2serial_nlookups >> 30:
+                # In older Pythons, we may overflow if we keep it an int.
+                self._oid2serial_nlookups = long(self._oid2serial_nlookups)
+                self._oid2serial_nhits = long(self._oid2serial_nhits)
+            blather("_oid2serial size %s lookups %s hits %s rate %.1f%%",
+                    len(self._oid2serial),
+                    self._oid2serial_nlookups,
+                    self._oid2serial_nhits,
+                    100.0 * self._oid2serial_nhits /
+                            self._oid2serial_nlookups)
+
+        return result
+
     def abortVersion(self, src, transaction):
         return self.commitVersion(src, '', transaction, abort=1)
 
@@ -585,9 +642,11 @@
 
             spos = h[-8:]
             srcpos = u64(spos)
+        self._toid2serial_delete.update(current_oids)
         return oids
 
-    def getSize(self): return self._pos
+    def getSize(self):
+        return self._pos
 
     def _loada(self, oid, _index, file):
         "Read any version and return the version"
@@ -632,6 +691,10 @@
                 (read(8) # skip past version link
                  and version != read(vlen))):
                 return _loadBack(file, oid, pnv)
+        else:
+            # The most recent record is for non-version data -- cache
+            # the serialno.
+            self._oid2serial[oid] = serial
 
         # If we get here, then either this was not a version record,
         # or we've already read past the version data!
@@ -713,20 +776,25 @@
 
         self._lock_acquire()
         try:
-            old=self._index_get(oid, 0)
-            pnv=None
+            old = self._index_get(oid, 0)
+            cached_serial = None
+            pnv = None
             if old:
-                self._file.seek(old)
-                h=self._file.read(DATA_HDR_LEN)
-                doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
-                if doid != oid: raise CorruptedDataError(h)
-                if vlen:
-                    pnv=self._file.read(8) # non-version data pointer
-                    self._file.read(8) # skip past version link
-                    locked_version=self._file.read(vlen)
-                    if version != locked_version:
-                        raise POSException.VersionLockError, (
-                            `oid`, locked_version)
+                cached_serial = self._get_cached_serial(oid)
+                if cached_serial is None:
+                    self._file.seek(old)
+                    h=self._file.read(DATA_HDR_LEN)
+                    doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
+                    if doid != oid: raise CorruptedDataError(h)
+                    if vlen:
+                        pnv=self._file.read(8) # non-version data pointer
+                        self._file.read(8) # skip past version link
+                        locked_version=self._file.read(vlen)
+                        if version != locked_version:
+                            raise POSException.VersionLockError, (
+                                `oid`, locked_version)
+                else:
+                    oserial = cached_serial
 
                 if serial != oserial:
                     data = self.tryToResolveConflict(oid, oserial, serial,
@@ -749,14 +817,19 @@
                        )
                   )
             if version:
-                if pnv: write(pnv)
-                else:   write(p64(old))
+                if pnv:
+                    write(pnv)
+                else:
+                    write(p64(old))
                 # Link to last record for this version:
                 tvindex=self._tvindex
                 pv=tvindex.get(version, 0) or self._vindex_get(version, 0)
                 write(p64(pv))
                 tvindex[version]=here
                 write(version)
+                self._toid2serial_delete[oid] = 1
+            else:
+                self._toid2serial[oid] = newserial
 
             write(data)
 
@@ -875,7 +948,11 @@
                 self._tfile.write(p64(pv))
                 self._tvindex[version] = here
                 self._tfile.write(version)
-            # And finally, write the data or a backpointer
+                self._toid2serial_delete[oid] = 1
+            else:
+                self._toid2serial[oid] = serial
+
+            # Finally, write the data or a backpointer.
             if data is None:
                 if prev_pos:
                     self._tfile.write(p64(prev_pos))
@@ -940,6 +1017,8 @@
     def _clear_temp(self):
         self._tindex.clear()
         self._tvindex.clear()
+        self._toid2serial.clear()
+        self._toid2serial_delete.clear()
         if self._tfile is not None:
             self._tfile.seek(0)
 
@@ -1023,6 +1102,12 @@
             
             self._index.update(self._tindex)
             self._vindex.update(self._tvindex)
+            self._oid2serial.update(self._toid2serial)
+            for oid in self._toid2serial_delete.keys():
+                try:
+                    del self._oid2serial[oid]
+                except KeyError:
+                    pass
             
             # Update the number of records that we've written
             # +1 for the transaction record
@@ -1090,21 +1175,28 @@
     def getSerial(self, oid):
         self._lock_acquire()
         try:
-            try:
-                return self._getSerial(oid, self._index[oid])
-            except KeyError:
-                raise POSKeyError(oid)
-            except TypeError:
-                raise TypeError, 'invalid oid %r' % (oid,)
+            result = self._get_cached_serial(oid)
+            if result is None:
+                try:
+                    result = self._getSerial(oid, self._index[oid])
+                except KeyError:
+                    raise POSKeyError(oid)
+                except TypeError:
+                    raise TypeError, 'invalid oid %r' % (oid,)
+            return result
         finally:
             self._lock_release()
 
     def _getSerial(self, oid, pos):
         self._file.seek(pos)
-        h = self._file.read(DATA_HDR_LEN)
+        h = self._file.read(16)
+        if len(h) < 16:
+            raise CorruptedDataError(h)
+        h += self._file.read(26) # get rest of header
+        if h[:8] != oid:
+            raise CorruptedDataError(h)
         oid2, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
-        assert oid == oid2
-        if splen==z64:
+        if splen == z64:
             # a back pointer
             bp = self._file.read(8)
             if bp == z64:
@@ -1243,6 +1335,10 @@
         tpos = self._txn_find(tid, 1)
         tindex = self._txn_undo_write(tpos)
         self._tindex.update(tindex)
+        # Arrange to clear the affected oids from the oid2serial cache.
+        # It's too painful to try to update them to correct current
+        # values instead.
+        self._toid2serial_delete.update(tindex)
         return tindex.keys()
 
     def _txn_find(self, tid, stop_at_pack):
@@ -1500,7 +1596,9 @@
                 # OK, we're beyond the point of no return
                 os.rename(self._file_name + '.pack', self._file_name)
                 self._file = open(self._file_name, 'r+b')
-                self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+                self._initIndex(p.index, p.vindex, p.tindex, p.tvindex,
+                                p.oid2serial, p.toid2serial,
+                                p.toid2serial_delete)
                 self._pos = opos
                 self._save_index()
             finally:
@@ -1526,20 +1624,9 @@
         if it is a new object -- return None.
         """
         try:
-            pos = self._index[oid]
+            return self.getSerial(oid)
         except KeyError:
             return None
-        except TypeError:
-            raise TypeError, 'invalid oid %r' % (oid,)
-        self._file.seek(pos)
-        # first 8 bytes are oid, second 8 bytes are serialno
-        h = self._file.read(16)
-        if len(h) < 16:
-            raise CorruptedDataError(h)
-        if h[:8] != oid:
-            h = h + self._file.read(26) # get rest of header
-            raise CorruptedDataError(h)
-        return h[8:]
 
     def cleanup(self):
         """Remove all files created by this storage."""




More information about the Zodb-checkins mailing list