[Checkins] SVN: ZODB/branches/3.8/src/ZEO/ Combined the ZEO client cache and file classes as a first step in a

Jim Fulton jim at zope.com
Sun May 11 11:15:14 EDT 2008


Log message for revision 86650:
  Combined the ZEO client cache and file classes as a first step in a
  refactoring to simplify the data structures to fix a serious memory
  bug: the cache uses waaaay the heck too much.
  

Changed:
  U   ZODB/branches/3.8/src/ZEO/cache.py
  U   ZODB/branches/3.8/src/ZEO/tests/test_cache.py

-=-
Modified: ZODB/branches/3.8/src/ZEO/cache.py
===================================================================
--- ZODB/branches/3.8/src/ZEO/cache.py	2008-05-11 09:48:43 UTC (rev 86649)
+++ ZODB/branches/3.8/src/ZEO/cache.py	2008-05-11 15:15:12 UTC (rev 86650)
@@ -65,6 +65,58 @@
 # full verification
 # <p>
 
+
+##
+# FileCache stores a cache in a single on-disk file.
+#
+# On-disk cache structure.
+#
+# The file begins with a 12-byte header.  The first four bytes are the
+# file's magic number - ZEC3 - indicating zeo cache version 3.  The
+# next eight bytes are the last transaction id.
+
+magic = "ZEC3"
+ZEC3_HEADER_SIZE = 12
+
+# After the header, the file contains a contiguous sequence of blocks.  All
+# blocks begin with a one-byte status indicator:
+#
+# 'a'
+#       Allocated.  The block holds an object; the next 4 bytes are >I
+#       format total block size.
+#
+# 'f'
+#       Free.  The block is free; the next 4 bytes are >I format total
+#       block size.
+#
+# '1', '2', '3', '4'
+#       The block is free, and consists of 1, 2, 3 or 4 bytes total.
+#
+# "Total" includes the status byte, and size bytes.  There are no
+# empty (size 0) blocks.
+
+
+# Allocated blocks have more structure:
+#
+#     1 byte allocation status ('a').
+#     4 bytes block size, >I format.
+#     16 bytes oid + tid, string.
+#     size-OBJECT_HEADER_SIZE bytes, the serialization of an Object (see
+#         class Object for details).
+
+OBJECT_HEADER_SIZE = 1 + 4 + 16
+
+# The cache's currentofs goes around the file, circularly, forever.
+# It's always the starting offset of some block.
+#
+# When a new object is added to the cache, it's stored beginning at
+# currentofs, and currentofs moves just beyond it.  As many contiguous
+# blocks needed to make enough room for the new object are evicted,
+# starting at currentofs.  Exception:  if currentofs is close enough
+# to the end of the file that the new object can't fit in one
+# contiguous chunk, currentofs is reset to ZEC3_HEADER_SIZE first.
+
+
 class ClientCache(object):
     """A simple in-memory cache."""
 
@@ -78,9 +130,16 @@
     # ClientStorage is the only user of ClientCache, and it always passes an
     # explicit size of its own choosing.
     def __init__(self, path=None, size=200*1024**2):
+
+        # - `path`:  filepath for the cache file, or None (in which case
+        #   a temp file will be created)
         self.path = path
-        self.size = size
 
+        # - `maxsize`:  total size of the cache file, in bytes; this is
+        #   ignored path names an existing file; perhaps we should attempt
+        #   to change the cache size in that case
+        self.maxsize = size
+
         # The cache stores objects in a dict mapping (oid, tid) pairs
         # to Object() records (see below).  The tid is the transaction
         # id that wrote the object.  An object record includes data,
@@ -103,14 +162,426 @@
         # is not modified in a version.
         self.version = {}
 
-        # A FileCache instance does all the low-level work of storing
-        # and retrieving objects to/from the cache file.
-        self.fc = FileCache(size, self.path, self)
+        # tid for the most recent transaction we know about.  This is also
+        # stored near the start of the file.
+        self.tid = None
 
-        self._setup_trace(self.path)
+        # There's one Entry instance, kept in memory, for each currently
+        # allocated block in the file, and there's one allocated block in the
+        # file per serialized Object.  filemap retrieves the Entry given the
+        # starting offset of a block, and key2entry retrieves the Entry given
+        # an object revision's key (an (oid, start_tid) pair).  From an
+        # Entry, we can get the Object's key and file offset.
 
+        # Map offset in file to pair (data record size, Entry).
+        # Entry is None iff the block starting at offset is free.
+        # filemap always contains a complete account of what's in the
+        # file -- study method _verify_filemap for executable checking
+        # of the relevant invariants.  An offset is at the start of a
+        # block iff it's a key in filemap.  The data record size is
+        # stored in the file too, so we could just seek to the offset
+        # and read it up; keeping it in memory is an optimization.
+        self.filemap = {}
+
+        # Map key to Entry.  After
+        #     obj = key2entry[key]
+        # then
+        #     obj.key == key
+        # is true.  An object is currently stored on disk iff its key is in
+        # key2entry.
+        self.key2entry = {}
+
+        # Always the offset into the file of the start of a block.
+        # New and relocated objects are always written starting at
+        # currentofs.
+        self.currentofs = ZEC3_HEADER_SIZE
+
+        # self.f is the open file object.
+        # When we're not reusing an existing file, self.f is left None
+        # here -- the scan() method must be called then to open the file
+        # (and it sets self.f).
+
+        if path:
+            self._lock_file = ZODB.lock_file.LockFile(path + '.lock')
+        
+        if path and os.path.exists(path):
+            # Reuse an existing file.  scan() will open & read it.
+            self.f = None
+            logger.info("reusing persistent cache file %r", path)
+        else:
+            if path:
+                self.f = open(path, 'wb+')
+                logger.info("created persistent cache file %r", path)
+            else:
+                self.f = tempfile.TemporaryFile()
+                logger.info("created temporary cache file %r", self.f.name)
+            # Make sure the OS really saves enough bytes for the file.
+            self.f.seek(self.maxsize - 1)
+            self.f.write('x')
+            self.f.truncate()
+            # Start with one magic header block
+            self.f.seek(0)
+            self.f.write(magic)
+            self.f.write(z64)
+            # and one free block.
+            self.f.write('f' + struct.pack(">I", self.maxsize -
+                                                 ZEC3_HEADER_SIZE))
+            sync(self.f)
+            self.filemap[ZEC3_HEADER_SIZE] = (self.maxsize - ZEC3_HEADER_SIZE,
+                                              None)
+
+        # Statistics:  _n_adds, _n_added_bytes,
+        #              _n_evicts, _n_evicted_bytes,
+        #              _n_accesses
+        self.clearStats()
+
+        self._setup_trace(path)
+
+
+    ##
+    # Scan the current contents of the cache file, calling `install`
+    # for each object found in the cache.  This method should only
+    # be called once to initialize the cache from disk.
+    def scan(self, install):
+        if self.f is not None:  # we're not (re)using a pre-existing file
+            return
+        fsize = os.path.getsize(self.path)
+        if fsize != self.maxsize:
+            logger.warning("existing cache file %r has size %d; "
+                           "requested size %d ignored", self.path,
+                           fsize, self.maxsize)
+            self.maxsize = fsize
+        self.f = open(self.path, 'rb+')
+        _magic = self.f.read(4)
+        if _magic != magic:
+            raise ValueError("unexpected magic number: %r" % _magic)
+        self.tid = self.f.read(8)
+        if len(self.tid) != 8:
+            raise ValueError("cache file too small -- no tid at start")
+
+        # Populate .filemap and .key2entry to reflect what's currently in the
+        # file, and tell our parent about it too (via the `install` callback).
+        # Remember the location of the largest free block.  That seems a
+        # decent place to start currentofs.
+        max_free_size = 0
+        ofs = max_free_offset = ZEC3_HEADER_SIZE
+        while ofs < fsize:
+            self.f.seek(ofs)
+            ent = None
+            status = self.f.read(1)
+            if status == 'a':
+                size, rawkey = struct.unpack(">I16s", self.f.read(20))
+                key = rawkey[:8], rawkey[8:]
+                assert key not in self.key2entry
+                self.key2entry[key] = ent = Entry(key, ofs)
+                install(self.f, ent)
+            elif status == 'f':
+                size, = struct.unpack(">I", self.f.read(4))
+            elif status in '1234':
+                size = int(status)
+            else:
+                raise ValueError("unknown status byte value %s in client "
+                                 "cache file" % 0, hex(ord(status)))
+
+            self.filemap[ofs] = size, ent
+            if ent is None and size > max_free_size:
+                max_free_size, max_free_offset = size, ofs
+
+            ofs += size
+
+        if ofs != fsize:
+            raise ValueError("final offset %s != file size %s in client "
+                             "cache file" % (ofs, fsize))
+        if __debug__:
+            self._verify_filemap()
+        self.currentofs = max_free_offset
+
+    def clearStats(self):
+        self._n_adds = self._n_added_bytes = 0
+        self._n_evicts = self._n_evicted_bytes = 0
+        self._n_accesses = 0
+
+    def getStats(self):
+        return (self._n_adds, self._n_added_bytes,
+                self._n_evicts, self._n_evicted_bytes,
+                self._n_accesses
+               )
+
+    ##
+    # The number of objects currently in the cache.
+    def __len__(self):
+        return len(self.key2entry)
+
+    ##
+    # Iterate over the objects in the cache, producing an Entry for each.
+    def __iter__(self):
+        return self.key2entry.itervalues()
+
+    ##
+    # Test whether an (oid, tid) pair is in the cache.
+    def __contains__(self, key):
+        return key in self.key2entry
+
+    ##
+    # Close the underlying file.  No methods accessing the cache should be
+    # used after this.
+    def close(self):
+        if hasattr(self,'_lock_file'):
+            self._lock_file.close()
+        if self.f:
+            sync(self.f)
+            self.f.close()
+            self.f = None
+
+    ##
+    # Evict objects as necessary to free up at least nbytes bytes,
+    # starting at currentofs.  If currentofs is closer than nbytes to
+    # the end of the file, currentofs is reset to ZEC3_HEADER_SIZE first.
+    # The number of bytes actually freed may be (and probably will be)
+    # greater than nbytes, and is _makeroom's return value.  The file is not
+    # altered by _makeroom.  filemap and key2entry are updated to reflect the
+    # evictions, and it's the caller's responsibility both to fiddle
+    # the file, and to update filemap, to account for all the space
+    # freed (starting at currentofs when _makeroom returns, and
+    # spanning the number of bytes retured by _makeroom).
+    def _makeroom(self, nbytes):
+        assert 0 < nbytes <= self.maxsize - ZEC3_HEADER_SIZE
+        if self.currentofs + nbytes > self.maxsize:
+            self.currentofs = ZEC3_HEADER_SIZE
+        ofs = self.currentofs
+        while nbytes > 0:
+            size, e = self.filemap.pop(ofs)
+            if e is not None:
+                del self.key2entry[e.key]
+                self._evictobj(e, size)
+            ofs += size
+            nbytes -= size
+        return ofs - self.currentofs
+
+    ##
+    # Write Object obj, with data, to file starting at currentofs.
+    # nfreebytes are already available for overwriting, and it's
+    # guranteed that's enough.  obj.offset is changed to reflect the
+    # new data record position, and filemap and key2entry are updated to
+    # match.
+    def _writeobj(self, obj, nfreebytes):
+        size = OBJECT_HEADER_SIZE + obj.size
+        assert size <= nfreebytes
+        excess = nfreebytes - size
+        # If there's any excess (which is likely), we need to record a
+        # free block following the end of the data record.  That isn't
+        # expensive -- it's all a contiguous write.
+        if excess == 0:
+            extra = ''
+        elif excess < 5:
+            extra = "01234"[excess]
+        else:
+            extra = 'f' + struct.pack(">I", excess)
+
+        self.f.seek(self.currentofs)
+
+        # Before writing data, we'll write a free block for the space freed.
+        # We'll come back with a last atomic write to rewrite the start of the
+        # allocated-block header.
+        self.f.write('f'+struct.pack(">I", nfreebytes))
+
+        # Now write the rest of the allocation block header and object data.
+        self.f.write(struct.pack(">8s8s", obj.key[0], obj.key[1]))
+        obj.serialize(self.f)
+        self.f.write(extra)
+
+        # Now, we'll go back and rewrite the beginning of the
+        # allocated block header.
+        self.f.seek(self.currentofs)
+        self.f.write('a'+struct.pack(">I", size))
+        
+        # Update index
+        e = Entry(obj.key, self.currentofs)
+        self.key2entry[obj.key] = e
+        self.filemap[self.currentofs] = size, e
+        self.currentofs += size
+        if excess:
+            # We need to record the free block in filemap, but there's
+            # no need to advance currentofs beyond it.  Instead it
+            # gives some breathing room for the next object to get
+            # written.
+            self.filemap[self.currentofs] = excess, None
+
+    ##
+    # Add Object object to the cache.  This may evict existing objects, to
+    # make room (and almost certainly will, in steady state once the cache
+    # is first full).  The object must not already be in the cache.  If the
+    # object is too large for the cache, False is returned, otherwise True.
+    def add(self, object):
+        size = OBJECT_HEADER_SIZE + object.size
+        # A number of cache simulation experiments all concluded that the
+        # 2nd-level ZEO cache got a much higher hit rate if "very large"
+        # objects simply weren't cached.  For now, we ignore the request
+        # only if the entire cache file is too small to hold the object.
+        if size > self.maxsize - ZEC3_HEADER_SIZE:
+            return False
+
+        assert object.key not in self.key2entry
+        assert len(object.key[0]) == 8
+        assert len(object.key[1]) == 8
+
+        self._n_adds += 1
+        self._n_added_bytes += size
+
+        available = self._makeroom(size)
+        self._writeobj(object, available)
+        return True
+
+    ##
+    # Evict the object represented by Entry `e` from the cache, freeing
+    # `size` bytes in the file for reuse.  `size` is used only for summary
+    # statistics.  This does not alter the file, or self.filemap or
+    # self.key2entry (those are the caller's responsibilities).  It does
+    # invoke _evicted(Object) on our parent.
+    def _evictobj(self, e, size):
+        self._n_evicts += 1
+        self._n_evicted_bytes += size
+        # Load the object header into memory so we know how to
+        # update the parent's in-memory data structures.
+        self.f.seek(e.offset + OBJECT_HEADER_SIZE)
+        o = Object.fromFile(self.f, e.key, skip_data=True)
+        self._evicted(o)
+
+    ##
+    # Return Object for key, or None if not in cache.
+    def access(self, key):
+        self._n_accesses += 1
+        e = self.key2entry.get(key)
+        if e is None:
+            return None
+        offset = e.offset
+        size, e2 = self.filemap[offset]
+        assert e is e2
+
+        self.f.seek(offset + OBJECT_HEADER_SIZE)
+        return Object.fromFile(self.f, key)
+
+    ##
+    # Remove Object for key from cache, if present.
+    def remove(self, key):
+        # If an object is being explicitly removed, we need to load
+        # its header into memory and write a free block marker to the
+        # disk where the object was stored.  We need to load the
+        # header to update the in-memory data structures held by
+        # ClientCache.
+
+        # We could instead just keep the header in memory at all times.
+
+        e = self.key2entry.pop(key, None)
+        if e is None:
+            return
+        offset = e.offset
+        size, e2 = self.filemap[offset]
+        assert e is e2
+        self.filemap[offset] = size, None
+        self.f.seek(offset + OBJECT_HEADER_SIZE)
+        o = Object.fromFile(self.f, key, skip_data=True)
+        assert size >= 5  # only free blocks are tiny
+        # Because `size` >= 5, we can change an allocated block to a free
+        # block just by overwriting the 'a' status byte with 'f' -- the
+        # size field stays the same.
+        self.f.seek(offset)
+        self.f.write('f')
+        self.f.flush()
+        self._evicted(o)
+
+    ##
+    # Update on-disk representation of Object obj.
+    #
+    # This method should be called when the object header is modified.
+    # obj must be in the cache.  The only real use for this is during
+    # invalidation, to set the end_tid field on a revision that was current
+    # (and so had an end_tid of None, but no longer does).
+    def update(self, obj):
+        e = self.key2entry[obj.key]
+        self.f.seek(e.offset + OBJECT_HEADER_SIZE)
+        obj.serialize_header(self.f)
+
+    ##
+    # Update our idea of the most recent tid.  This is stored in the
+    # instance, and also written out near the start of the cache file.  The
+    # new tid must be strictly greater than our current idea of the most
+    # recent tid.
+    def setLastTid(self, tid):
+        if self.tid is not None and tid <= self.tid:
+            raise ValueError("new last tid (%s) must be greater than "
+                             "previous one (%s)" % (u64(tid),
+                                                    u64(self.tid)))
+        assert isinstance(tid, str) and len(tid) == 8
+        self.tid = tid
+        self.f.seek(len(magic))
+        self.f.write(tid)
+        self.f.flush()
+
+
+    ##
+    # This debug method marches over the entire cache file, verifying that
+    # the current contents match the info in self.filemap and self.key2entry.
+    def _verify_filemap(self, display=False):
+        a = ZEC3_HEADER_SIZE
+        f = self.f
+        while a < self.maxsize:
+            f.seek(a)
+            status = f.read(1)
+            if status in 'af':
+                size, = struct.unpack(">I", f.read(4))
+            else:
+                size = int(status)
+            if display:
+                if a == self.currentofs:
+                    print '*****',
+                print "%c%d" % (status, size),
+            size2, obj = self.filemap[a]
+            assert size == size2
+            assert (obj is not None) == (status == 'a')
+            if obj is not None:
+                assert obj.offset == a
+                assert self.key2entry[obj.key] is obj
+            a += size
+        if display:
+            print
+        assert a == self.maxsize
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
     def open(self):
-        self.fc.scan(self.install)
+        self.scan(self.install)
 
     ##
     # Callback for FileCache.scan(), when a pre-existing file cache is
@@ -136,31 +607,20 @@
             else:
                 self.noncurrent[oid] = [this_span]
 
-    def close(self):
-        self.fc.close()
-        if self._tracefile:
-            sync(self._tracefile)
-            self._tracefile.close()
-            self._tracefile = None
-
     ##
     # Set the last transaction seen by the cache.
     # @param tid a transaction id
     # @exception ValueError attempt to set a new tid less than the current tid
 
-    def setLastTid(self, tid):
-        self.fc.settid(tid)
-
     ##
     # Return the last transaction seen by the cache.
     # @return a transaction id
     # @defreturn string, or None if no transaction is yet known
-
     def getLastTid(self):
-        if self.fc.tid == z64:
+        if self.tid == z64:
             return None
         else:
-            return self.fc.tid
+            return self.tid
 
     ##
     # Return the current data record for oid and version.
@@ -187,7 +647,7 @@
         if tid is None:
             self._trace(0x20, oid, version)
             return None
-        o = self.fc.access((oid, tid))
+        o = self.access((oid, tid))
         if o is None:
             self._trace(0x20, oid, version)
             return None
@@ -222,7 +682,7 @@
         if tid > hi:    # we don't have any data in the right range
             self._trace(0x24, oid, "", tid)
             return None
-        o = self.fc.access((oid, lo))
+        o = self.access((oid, lo))
         self._trace(0x26, oid, "", tid)
         return o.data, o.start_tid, o.end_tid
 
@@ -260,7 +720,7 @@
         # the requested version, doesn't find it, then asks the server
         # for that data.  The server returns the non-version data,
         # which may already be in the cache.
-        if (oid, start_tid) in self.fc:
+        if (oid, start_tid) in self:
             return
         o = Object((oid, start_tid), version, data, start_tid, end_tid)
         if version:
@@ -270,7 +730,7 @@
                 if self.version[oid] != (version, start_tid):
                     raise ValueError("data already exists for version %r"
                                      % self.version[oid][0])
-            if not self.fc.add(o):
+            if not self.add(o):
                 return # too large
             self.version[oid] = version, start_tid
             self._trace(0x50, oid, version, start_tid, dlen=len(data))
@@ -283,7 +743,7 @@
                             "already have current data for oid")
                     else:
                         return
-                if not self.fc.add(o):
+                if not self.add(o):
                     return # too large
                 self.current[oid] = start_tid
                 self._trace(0x52, oid, version, start_tid, dlen=len(data))
@@ -292,7 +752,7 @@
                 p = start_tid, end_tid
                 if p in L:
                     return # duplicate store
-                if not self.fc.add(o):
+                if not self.add(o):
                     return # too large
                 bisect.insort_left(L, p)
                 self._trace(0x54, oid, version, start_tid, end_tid,
@@ -311,7 +771,7 @@
             for old_tid, dummy in noncurrent_list[:]:
                 # 0x1E = invalidate (hit, discarding current or non-current)
                 self._trace(0x1E, oid, version, tid)
-                self.fc.remove((oid, old_tid))
+                self.remove((oid, old_tid))
             # fc.remove() calling back to _evicted() should have removed
             # the list from noncurrent when the last non-current revision
             # was removed.
@@ -331,8 +791,8 @@
     #        or None to forget all cached info about oid (version, current
     #        revision, and non-current revisions)
     def invalidate(self, oid, version, tid):
-        if tid > self.fc.tid and tid is not None:
-            self.fc.settid(tid)
+        if tid > self.tid and tid is not None:
+            self.setLastTid(tid)
 
         remove_all_knowledge_of_oid = tid is None
 
@@ -342,7 +802,7 @@
             self._trace(0x1A, oid, version, tid)
             dllversion, dlltid = self.version[oid]
             assert not version or version == dllversion, (version, dllversion)
-            self.fc.remove((oid, dlltid))
+            self.remove((oid, dlltid))
             assert oid not in self.version # .remove() got rid of it
             # And continue:  we must also remove any non-version data from
             # the cache.  Or, at least, I have such a poor understanding of
@@ -365,7 +825,7 @@
         if remove_all_knowledge_of_oid:
             # 0x1E = invalidate (hit, discarding current or non-current)
             self._trace(0x1E, oid, version, tid)
-            self.fc.remove((oid, cur_tid))
+            self.remove((oid, cur_tid))
             assert cur_tid not in self.current  # .remove() got rid of it
             return
 
@@ -377,7 +837,7 @@
 
         # Update the end_tid half of oid's validity range on disk.
         # TODO: Want to fetch object without marking it as accessed.
-        o = self.fc.access((oid, cur_tid))
+        o = self.access((oid, cur_tid))
         assert o is not None
         assert o.end_tid is None  # i.e., o was current
         if o is None:
@@ -385,20 +845,11 @@
             # should be removed; waiting on time to prove it can't happen.
             return
         o.end_tid = tid
-        self.fc.update(o)   # record the new end_tid on disk
+        self.update(o)   # record the new end_tid on disk
         # Add to oid's list of non-current data.
         L = self.noncurrent.setdefault(oid, [])
         bisect.insort_left(L, (cur_tid, tid))
 
-    ##
-    # Return the number of object revisions in the cache.
-    #
-    # Or maybe better to just return len(self.cache)?  Needs clearer use case.
-    def __len__(self):
-        n = len(self.current) + len(self.version)
-        if self.noncurrent:
-            n += sum(map(len, self.noncurrent))
-        return n
 
     ##
     # Generates (oid, serial, version) triples for all objects in the
@@ -406,10 +857,10 @@
     def contents(self):
         # May need to materialize list instead of iterating;
         # depends on whether the caller may change the cache.
-        for o in self.fc:
+        for o in self:
             oid, tid = o.key
             if oid in self.version:
-                obj = self.fc.access(o.key)
+                obj = self.access(o.key)
                 yield oid, tid, obj.version
             else:
                 yield oid, tid, ""
@@ -422,7 +873,7 @@
         for oid, tid, version in L:
             print oid_repr(oid), oid_repr(tid), repr(version)
         print "dll contents"
-        L = list(self.fc)
+        L = list(self)
         L.sort(lambda x, y: cmp(x.key, y.key))
         for x in L:
             end_tid = x.end_tid or z64
@@ -659,463 +1110,10 @@
         self.offset = offset
 
 
-
-##
-# FileCache stores a cache in a single on-disk file.
-#
-# On-disk cache structure.
-#
-# The file begins with a 12-byte header.  The first four bytes are the
-# file's magic number - ZEC3 - indicating zeo cache version 3.  The
-# next eight bytes are the last transaction id.
-
-magic = "ZEC3"
-ZEC3_HEADER_SIZE = 12
-
-# After the header, the file contains a contiguous sequence of blocks.  All
-# blocks begin with a one-byte status indicator:
-#
-# 'a'
-#       Allocated.  The block holds an object; the next 4 bytes are >I
-#       format total block size.
-#
-# 'f'
-#       Free.  The block is free; the next 4 bytes are >I format total
-#       block size.
-#
-# '1', '2', '3', '4'
-#       The block is free, and consists of 1, 2, 3 or 4 bytes total.
-#
-# "Total" includes the status byte, and size bytes.  There are no
-# empty (size 0) blocks.
-
-
-# Allocated blocks have more structure:
-#
-#     1 byte allocation status ('a').
-#     4 bytes block size, >I format.
-#     16 bytes oid + tid, string.
-#     size-OBJECT_HEADER_SIZE bytes, the serialization of an Object (see
-#         class Object for details).
-
-OBJECT_HEADER_SIZE = 1 + 4 + 16
-
-# The cache's currentofs goes around the file, circularly, forever.
-# It's always the starting offset of some block.
-#
-# When a new object is added to the cache, it's stored beginning at
-# currentofs, and currentofs moves just beyond it.  As many contiguous
-# blocks needed to make enough room for the new object are evicted,
-# starting at currentofs.  Exception:  if currentofs is close enough
-# to the end of the file that the new object can't fit in one
-# contiguous chunk, currentofs is reset to ZEC3_HEADER_SIZE first.
-
-# Do all possible to ensure that the bytes we wrote to file f are really on
-# disk.
 def sync(f):
     f.flush()
-    if hasattr(os, 'fsync'):
-        os.fsync(f.fileno())
 
-class FileCache(object):
-
-    def __init__(self, maxsize, fpath, parent):
-        # - `maxsize`:  total size of the cache file, in bytes; this is
-        #   ignored path names an existing file; perhaps we should attempt
-        #   to change the cache size in that case
-        # - `fpath`:  filepath for the cache file, or None (in which case
-        #   a temp file will be created)
-        # - `parent`:  the ClientCache instance; its `_evicted()` method
-        #   is called whenever we need to evict an object to make room in
-        #   the file
-        self.maxsize = maxsize
-        self.parent = parent
-
-        # tid for the most recent transaction we know about.  This is also
-        # stored near the start of the file.
-        self.tid = None
-
-        # There's one Entry instance, kept in memory, for each currently
-        # allocated block in the file, and there's one allocated block in the
-        # file per serialized Object.  filemap retrieves the Entry given the
-        # starting offset of a block, and key2entry retrieves the Entry given
-        # an object revision's key (an (oid, start_tid) pair).  From an
-        # Entry, we can get the Object's key and file offset.
-
-        # Map offset in file to pair (data record size, Entry).
-        # Entry is None iff the block starting at offset is free.
-        # filemap always contains a complete account of what's in the
-        # file -- study method _verify_filemap for executable checking
-        # of the relevant invariants.  An offset is at the start of a
-        # block iff it's a key in filemap.  The data record size is
-        # stored in the file too, so we could just seek to the offset
-        # and read it up; keeping it in memory is an optimization.
-        self.filemap = {}
-
-        # Map key to Entry.  After
-        #     obj = key2entry[key]
-        # then
-        #     obj.key == key
-        # is true.  An object is currently stored on disk iff its key is in
-        # key2entry.
-        self.key2entry = {}
-
-        # Always the offset into the file of the start of a block.
-        # New and relocated objects are always written starting at
-        # currentofs.
-        self.currentofs = ZEC3_HEADER_SIZE
-
-        # self.f is the open file object.
-        # When we're not reusing an existing file, self.f is left None
-        # here -- the scan() method must be called then to open the file
-        # (and it sets self.f).
-
-        self.fpath = fpath
-
-        if fpath:
-            self._lock_file = ZODB.lock_file.LockFile(fpath + '.lock')
-        
-        if fpath and os.path.exists(fpath):
-            # Reuse an existing file.  scan() will open & read it.
-            self.f = None
-            logger.info("reusing persistent cache file %r", fpath)
-        else:
-            if fpath:
-                self.f = open(fpath, 'wb+')
-                logger.info("created persistent cache file %r", fpath)
-            else:
-                self.f = tempfile.TemporaryFile()
-                logger.info("created temporary cache file %r", self.f.name)
-            # Make sure the OS really saves enough bytes for the file.
-            self.f.seek(self.maxsize - 1)
-            self.f.write('x')
-            self.f.truncate()
-            # Start with one magic header block
-            self.f.seek(0)
-            self.f.write(magic)
-            self.f.write(z64)
-            # and one free block.
-            self.f.write('f' + struct.pack(">I", self.maxsize -
-                                                 ZEC3_HEADER_SIZE))
-            self.sync()
-            self.filemap[ZEC3_HEADER_SIZE] = (self.maxsize - ZEC3_HEADER_SIZE,
-                                              None)
-
-        # Statistics:  _n_adds, _n_added_bytes,
-        #              _n_evicts, _n_evicted_bytes,
-        #              _n_accesses
-        self.clearStats()
-
-    ##
-    # Scan the current contents of the cache file, calling `install`
-    # for each object found in the cache.  This method should only
-    # be called once to initialize the cache from disk.
-    def scan(self, install):
-        if self.f is not None:  # we're not (re)using a pre-existing file
-            return
-        fsize = os.path.getsize(self.fpath)
-        if fsize != self.maxsize:
-            logger.warning("existing cache file %r has size %d; "
-                           "requested size %d ignored", self.fpath,
-                           fsize, self.maxsize)
-            self.maxsize = fsize
-        self.f = open(self.fpath, 'rb+')
-        _magic = self.f.read(4)
-        if _magic != magic:
-            raise ValueError("unexpected magic number: %r" % _magic)
-        self.tid = self.f.read(8)
-        if len(self.tid) != 8:
-            raise ValueError("cache file too small -- no tid at start")
-
-        # Populate .filemap and .key2entry to reflect what's currently in the
-        # file, and tell our parent about it too (via the `install` callback).
-        # Remember the location of the largest free block.  That seems a
-        # decent place to start currentofs.
-        max_free_size = 0
-        ofs = max_free_offset = ZEC3_HEADER_SIZE
-        while ofs < fsize:
-            self.f.seek(ofs)
-            ent = None
-            status = self.f.read(1)
-            if status == 'a':
-                size, rawkey = struct.unpack(">I16s", self.f.read(20))
-                key = rawkey[:8], rawkey[8:]
-                assert key not in self.key2entry
-                self.key2entry[key] = ent = Entry(key, ofs)
-                install(self.f, ent)
-            elif status == 'f':
-                size, = struct.unpack(">I", self.f.read(4))
-            elif status in '1234':
-                size = int(status)
-            else:
-                raise ValueError("unknown status byte value %s in client "
-                                 "cache file" % 0, hex(ord(status)))
-
-            self.filemap[ofs] = size, ent
-            if ent is None and size > max_free_size:
-                max_free_size, max_free_offset = size, ofs
-
-            ofs += size
-
-        if ofs != fsize:
-            raise ValueError("final offset %s != file size %s in client "
-                             "cache file" % (ofs, fsize))
-        if __debug__:
-            self._verify_filemap()
-        self.currentofs = max_free_offset
-
-    def clearStats(self):
-        self._n_adds = self._n_added_bytes = 0
-        self._n_evicts = self._n_evicted_bytes = 0
-        self._n_accesses = 0
-
-    def getStats(self):
-        return (self._n_adds, self._n_added_bytes,
-                self._n_evicts, self._n_evicted_bytes,
-                self._n_accesses
-               )
-
-    ##
-    # The number of objects currently in the cache.
-    def __len__(self):
-        return len(self.key2entry)
-
-    ##
-    # Iterate over the objects in the cache, producing an Entry for each.
-    def __iter__(self):
-        return self.key2entry.itervalues()
-
-    ##
-    # Test whether an (oid, tid) pair is in the cache.
-    def __contains__(self, key):
-        return key in self.key2entry
-
-    ##
-    # Do all possible to ensure all bytes written to the file so far are
-    # actually on disk.
-    def sync(self):
-        sync(self.f)
-
-    ##
-    # Close the underlying file.  No methods accessing the cache should be
-    # used after this.
-    def close(self):
-        if hasattr(self,'_lock_file'):
-            self._lock_file.close()
-        if self.f:
-            self.sync()
-            self.f.close()
-            self.f = None
-
-    ##
-    # Evict objects as necessary to free up at least nbytes bytes,
-    # starting at currentofs.  If currentofs is closer than nbytes to
-    # the end of the file, currentofs is reset to ZEC3_HEADER_SIZE first.
-    # The number of bytes actually freed may be (and probably will be)
-    # greater than nbytes, and is _makeroom's return value.  The file is not
-    # altered by _makeroom.  filemap and key2entry are updated to reflect the
-    # evictions, and it's the caller's responsibility both to fiddle
-    # the file, and to update filemap, to account for all the space
-    # freed (starting at currentofs when _makeroom returns, and
-    # spanning the number of bytes retured by _makeroom).
-    def _makeroom(self, nbytes):
-        assert 0 < nbytes <= self.maxsize - ZEC3_HEADER_SIZE
-        if self.currentofs + nbytes > self.maxsize:
-            self.currentofs = ZEC3_HEADER_SIZE
-        ofs = self.currentofs
-        while nbytes > 0:
-            size, e = self.filemap.pop(ofs)
-            if e is not None:
-                del self.key2entry[e.key]
-                self._evictobj(e, size)
-            ofs += size
-            nbytes -= size
-        return ofs - self.currentofs
-
-    ##
-    # Write Object obj, with data, to file starting at currentofs.
-    # nfreebytes are already available for overwriting, and it's
-    # guranteed that's enough.  obj.offset is changed to reflect the
-    # new data record position, and filemap and key2entry are updated to
-    # match.
-    def _writeobj(self, obj, nfreebytes):
-        size = OBJECT_HEADER_SIZE + obj.size
-        assert size <= nfreebytes
-        excess = nfreebytes - size
-        # If there's any excess (which is likely), we need to record a
-        # free block following the end of the data record.  That isn't
-        # expensive -- it's all a contiguous write.
-        if excess == 0:
-            extra = ''
-        elif excess < 5:
-            extra = "01234"[excess]
-        else:
-            extra = 'f' + struct.pack(">I", excess)
-
-        self.f.seek(self.currentofs)
-
-        # Before writing data, we'll write a free block for the space freed.
-        # We'll come back with a last atomic write to rewrite the start of the
-        # allocated-block header.
-        self.f.write('f'+struct.pack(">I", nfreebytes))
-
-        # Now write the rest of the allocation block header and object data.
-        self.f.write(struct.pack(">8s8s", obj.key[0], obj.key[1]))
-        obj.serialize(self.f)
-        self.f.write(extra)
-
-        # Now, we'll go back and rewrite the beginning of the
-        # allocated block header.
-        self.f.seek(self.currentofs)
-        self.f.write('a'+struct.pack(">I", size))
-        
-        # Update index
-        e = Entry(obj.key, self.currentofs)
-        self.key2entry[obj.key] = e
-        self.filemap[self.currentofs] = size, e
-        self.currentofs += size
-        if excess:
-            # We need to record the free block in filemap, but there's
-            # no need to advance currentofs beyond it.  Instead it
-            # gives some breathing room for the next object to get
-            # written.
-            self.filemap[self.currentofs] = excess, None
-
-    ##
-    # Add Object object to the cache.  This may evict existing objects, to
-    # make room (and almost certainly will, in steady state once the cache
-    # is first full).  The object must not already be in the cache.  If the
-    # object is too large for the cache, False is returned, otherwise True.
-    def add(self, object):
-        size = OBJECT_HEADER_SIZE + object.size
-        # A number of cache simulation experiments all concluded that the
-        # 2nd-level ZEO cache got a much higher hit rate if "very large"
-        # objects simply weren't cached.  For now, we ignore the request
-        # only if the entire cache file is too small to hold the object.
-        if size > self.maxsize - ZEC3_HEADER_SIZE:
-            return False
-
-        assert object.key not in self.key2entry
-        assert len(object.key[0]) == 8
-        assert len(object.key[1]) == 8
-
-        self._n_adds += 1
-        self._n_added_bytes += size
-
-        available = self._makeroom(size)
-        self._writeobj(object, available)
-        return True
-
-    ##
-    # Evict the object represented by Entry `e` from the cache, freeing
-    # `size` bytes in the file for reuse.  `size` is used only for summary
-    # statistics.  This does not alter the file, or self.filemap or
-    # self.key2entry (those are the caller's responsibilities).  It does
-    # invoke _evicted(Object) on our parent.
-    def _evictobj(self, e, size):
-        self._n_evicts += 1
-        self._n_evicted_bytes += size
-        # Load the object header into memory so we know how to
-        # update the parent's in-memory data structures.
-        self.f.seek(e.offset + OBJECT_HEADER_SIZE)
-        o = Object.fromFile(self.f, e.key, skip_data=True)
-        self.parent._evicted(o)
-
-    ##
-    # Return Object for key, or None if not in cache.
-    def access(self, key):
-        self._n_accesses += 1
-        e = self.key2entry.get(key)
-        if e is None:
-            return None
-        offset = e.offset
-        size, e2 = self.filemap[offset]
-        assert e is e2
-
-        self.f.seek(offset + OBJECT_HEADER_SIZE)
-        return Object.fromFile(self.f, key)
-
-    ##
-    # Remove Object for key from cache, if present.
-    def remove(self, key):
-        # If an object is being explicitly removed, we need to load
-        # its header into memory and write a free block marker to the
-        # disk where the object was stored.  We need to load the
-        # header to update the in-memory data structures held by
-        # ClientCache.
-
-        # We could instead just keep the header in memory at all times.
-
-        e = self.key2entry.pop(key, None)
-        if e is None:
-            return
-        offset = e.offset
-        size, e2 = self.filemap[offset]
-        assert e is e2
-        self.filemap[offset] = size, None
-        self.f.seek(offset + OBJECT_HEADER_SIZE)
-        o = Object.fromFile(self.f, key, skip_data=True)
-        assert size >= 5  # only free blocks are tiny
-        # Because `size` >= 5, we can change an allocated block to a free
-        # block just by overwriting the 'a' status byte with 'f' -- the
-        # size field stays the same.
-        self.f.seek(offset)
-        self.f.write('f')
-        self.f.flush()
-        self.parent._evicted(o)
-
-    ##
-    # Update on-disk representation of Object obj.
-    #
-    # This method should be called when the object header is modified.
-    # obj must be in the cache.  The only real use for this is during
-    # invalidation, to set the end_tid field on a revision that was current
-    # (and so had an end_tid of None, but no longer does).
-    def update(self, obj):
-        e = self.key2entry[obj.key]
-        self.f.seek(e.offset + OBJECT_HEADER_SIZE)
-        obj.serialize_header(self.f)
-
-    ##
-    # Update our idea of the most recent tid.  This is stored in the
-    # instance, and also written out near the start of the cache file.  The
-    # new tid must be strictly greater than our current idea of the most
-    # recent tid.
-    def settid(self, tid):
-        if self.tid is not None and tid <= self.tid:
-            raise ValueError("new last tid (%s) must be greater than "
-                             "previous one (%s)" % (u64(tid),
-                                                    u64(self.tid)))
-        assert isinstance(tid, str) and len(tid) == 8
-        self.tid = tid
-        self.f.seek(len(magic))
-        self.f.write(tid)
-        self.f.flush()
-
-    ##
-    # This debug method marches over the entire cache file, verifying that
-    # the current contents match the info in self.filemap and self.key2entry.
-    def _verify_filemap(self, display=False):
-        a = ZEC3_HEADER_SIZE
-        f = self.f
-        while a < self.maxsize:
-            f.seek(a)
-            status = f.read(1)
-            if status in 'af':
-                size, = struct.unpack(">I", f.read(4))
-            else:
-                size = int(status)
-            if display:
-                if a == self.currentofs:
-                    print '*****',
-                print "%c%d" % (status, size),
-            size2, obj = self.filemap[a]
-            assert size == size2
-            assert (obj is not None) == (status == 'a')
-            if obj is not None:
-                assert obj.offset == a
-                assert self.key2entry[obj.key] is obj
-            a += size
-        if display:
-            print
-        assert a == self.maxsize
+if hasattr(os, 'fsync'):
+    def sync(f):
+        f.flush()
+        os.fsync(f.fileno())

Modified: ZODB/branches/3.8/src/ZEO/tests/test_cache.py
===================================================================
--- ZODB/branches/3.8/src/ZEO/tests/test_cache.py	2008-05-11 09:48:43 UTC (rev 86649)
+++ ZODB/branches/3.8/src/ZEO/tests/test_cache.py	2008-05-11 15:15:12 UTC (rev 86650)
@@ -109,21 +109,20 @@
 
     def testEviction(self):
         # Manually override the current maxsize
-        maxsize = self.cache.size = self.cache.fc.maxsize = 3395 # 1245
-        self.cache.fc = ZEO.cache.FileCache(3395, None, self.cache)
+        cache = ZEO.cache.ClientCache(None, 3395)
 
         # Trivial test of eviction code.  Doesn't test non-current
         # eviction.
         data = ["z" * i for i in range(100)]
         for i in range(50):
             n = p64(i)
-            self.cache.store(n, "", n, None, data[i])
-            self.assertEquals(len(self.cache), i + 1)
+            cache.store(n, "", n, None, data[i])
+            self.assertEquals(len(cache), i + 1)
         # The cache now uses 1225 bytes.  The next insert
         # should delete some objects.
         n = p64(50)
-        self.cache.store(n, "", n, None, data[51])
-        self.assert_(len(self.cache) < 51)
+        cache.store(n, "", n, None, data[51])
+        self.assert_(len(cache) < 51)
 
         # TODO:  Need to make sure eviction of non-current data
         # and of version data are handled correctly.
@@ -138,9 +137,9 @@
         # Copy data from self.cache into path, reaching into the cache
         # guts to make the copy.
         dst = open(path, "wb+")
-        src = self.cache.fc.f
+        src = self.cache.f
         src.seek(0)
-        dst.write(src.read(self.cache.fc.maxsize))
+        dst.write(src.read(self.cache.maxsize))
         dst.close()
         copy = ZEO.cache.ClientCache(path)
         copy.open()



More information about the Checkins mailing list