[Zodb-checkins] CVS: ZODB3/ZEO - cache.py:1.2 stats.py:1.22 StorageServer.py:1.104 ServerStub.py:1.18 ClientStorage.py:1.113 ICache.py:NONE ClientCache.py:NONE

Jeremy Hylton jeremy at zope.com
Wed Dec 24 11:02:43 EST 2003


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv27465/ZEO

Modified Files:
	stats.py StorageServer.py ServerStub.py ClientStorage.py 
Added Files:
	cache.py 
Removed Files:
	ICache.py ClientCache.py 
Log Message:
Merge MVCC branch to the HEAD.


=== ZODB3/ZEO/cache.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 24 11:02:42 2003
+++ ZODB3/ZEO/cache.py	Wed Dec 24 11:02:03 2003
@@ -0,0 +1,877 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Disk-based client cache for ZEO.
+
+ClientCache exposes an API used by the ZEO client storage.  FileCache
+stores objects one disk using a 2-tuple of oid and tid as key.
+
+The upper cache's API is similar to a storage API with methods like
+load(), store(), and invalidate().  It manages in-memory data
+structures that allow it to map this richer API onto the simple
+key-based API of the lower-level cache.
+"""
+
+import bisect
+import logging
+import os
+import struct
+import tempfile
+import time
+
+from sets import Set
+
+from ZODB.utils import z64, u64
+
+##
+# A disk-based cache for ZEO clients.
+# <p>
+# This class provides an interface to a persistent, disk-based cache
+# used by ZEO clients to store copies of database records from the
+# server.
+# <p>
+# The details of the constructor as unspecified at this point.
+# <p>
+# Each entry in the cache is valid for a particular range of transaction
+# ids.  The lower bound is the transaction that wrote the data.  The
+# upper bound is the next transaction that wrote a revision of the
+# object.  If the data is current, the upper bound is stored as None;
+# the data is considered current until an invalidate() call is made.
+# <p>
+# It is an error to call store() twice with the same object without an
+# intervening invalidate() to set the upper bound on the first cache
+# entry.  <em>Perhaps it will be necessary to have a call the removes
+# something from the cache outright, without keeping a non-current
+# entry.</em>
+# <h3>Cache verification</h3>
+# <p>
+# When the client is connected to the server, it receives
+# invalidations every time an object is modified.  Whe the client is
+# disconnected, it must perform cache verification to make sure its
+# cached data is synchronized with the storage's current state.
+# <p>
+# quick verification
+# full verification
+# <p>
+
+class ClientCache:
+    """A simple in-memory cache."""
+
+    ##
+    # Do we put the constructor here?
+    # @param path path of persistent snapshot of cache state
+    # @param size maximum size of object data, in bytes
+
+    def __init__(self, path=None, size=None, trace=True):
+        self.path = path
+        self.size = size
+        self.log = logging.getLogger("zeo.cache")
+
+        if trace and path:
+            self._setup_trace()
+        else:
+            self._trace = self._notrace
+
+        # Last transaction seen by the cache, either via setLastTid()
+        # or by invalidate().
+        self.tid = None
+
+        # The cache stores objects in a dict mapping (oid, tid) pairs
+        # to Object() records (see below).  The tid is the transaction
+        # id that wrote the object.  An object record includes data,
+        # serialno, and end tid.  It has auxillary data structures to
+        # compute the appropriate tid, given the oid and a transaction id
+        # representing an arbitrary point in history.
+        #
+        # The serialized form of the cache just stores the Object()
+        # records.  The in-memory form can be reconstructed from these
+        # records.
+
+        # Maps oid to current tid.  Used to find compute key for objects.
+        self.current = {}
+        # Maps oid to list of (start_tid, end_tid) pairs in sorted order.
+        # Used to find matching key for load of non-current data.
+        self.noncurrent = {}
+        # Map oid to version, tid pair.  If there is no entry, the object
+        # is not modified in a version.
+        self.version = {}
+
+        # A double-linked list is used to manage the cache.  It makes
+        # decisions about which objects to keep and which to evict.
+        self.fc = FileCache(size or 10**6, self.path, self)
+
+    def open(self):
+        self.fc.scan(self.install)
+
+    def install(self, f, ent):
+        # Called by cache storage layer to insert object
+        o = Object.fromFile(f, ent.key, header_only=True)
+        if o is None:
+            return
+        oid = o.key[0]
+        if o.version:
+            self.version[oid] = o.version, o.start_tid
+        elif o.end_tid is None:
+            self.current[oid] = o.start_tid
+        else:
+            L = self.noncurrent.setdefault(oid, [])
+            bisect.insort_left(L, (o.start_tid, o.end_tid))
+
+    def close(self):
+        self.fc.close()
+
+    ##
+    # Set the last transaction seen by the cache.
+    # @param tid a transaction id
+    # @exception ValueError attempt to set a new tid less than the current tid
+
+    def setLastTid(self, tid):
+        self.fc.settid(tid)
+
+    ##
+    # Return the last transaction seen by the cache.
+    # @return a transaction id
+    # @defreturn string
+
+    def getLastTid(self):
+        if self.fc.tid == z64:
+            return None
+        else:
+            return self.fc.tid
+
+    ##
+    # Return the current data record for oid and version.
+    # @param oid object id
+    # @param version a version string
+    # @return data record, serial number, tid or None if the object is not
+    #         in the cache
+    # @defreturn 3-tuple: (string, string, string)
+
+    def load(self, oid, version=""):
+        tid = None
+        if version:
+            p = self.version.get(oid)
+            if p is None:
+                return None
+            elif p[0] == version:
+                tid = p[1]
+            # Otherwise, we know the cache has version data but not
+            # for the requested version.  Thus, we know it is safe
+            # to return the non-version data from the cache.
+        if tid is None:
+            tid = self.current.get(oid)
+        if tid is None:
+            self._trace(0x20, oid, version)
+            return None
+        o = self.fc.access((oid, tid))
+        if o is None:
+            return None
+        self._trace(0x22, oid, version, o.start_tid, o.end_tid, len(o.data))
+        return o.data, tid, o.version
+
+    ##
+    # Return a non-current revision of oid that was current before tid.
+    # @param oid object id
+    # @param tid id of transaction that wrote next revision of oid
+    # @return data record, serial number, start tid, and end tid
+    # @defreturn 4-tuple: (string, string, string, string)
+
+    def loadBefore(self, oid, tid):
+        L = self.noncurrent.get(oid)
+        if L is None:
+            self._trace(0x24, oid, tid)
+            return None
+        # A pair with None as the second element will always be less
+        # than any pair with the same first tid.
+        i = bisect.bisect_left(L, (tid, None))
+        # The least element left of tid was written before tid.  If
+        # there is no element, the cache doesn't have old enough data.
+        if i == 0:
+            self._trace(0x24, oid, tid)
+            return
+        lo, hi = L[i-1]
+        # XXX lo should always be less than tid
+        if not lo < tid <= hi:
+            self._trace(0x24, oid, tid)
+            return None
+        o = self.fc.access((oid, lo))
+        self._trace(0x26, oid, tid)
+        return o.data, o.start_tid, o.end_tid
+
+    ##
+    # Return the version an object is modified in or None for an
+    # object that is not modified in a version.
+    # @param oid object id
+    # @return name of version in which the object is modified
+    # @defreturn string or None
+
+    def modifiedInVersion(self, oid):
+        p = self.version.get(oid)
+        if p is None:
+            return None
+        version, tid = p
+        return version
+
+    ##
+    # Store a new data record in the cache.
+    # @param oid object id
+    # @param version name of version that oid was modified in.  The cache
+    #                only stores current version data, so end_tid should
+    #                be None.
+    # @param start_tid the id of the transaction that wrote this revision
+    # @param end_tid the id of the transaction that created the next
+    #                revision of oid.  If end_tid is None, the data is
+    #                current.
+    # @param data the actual data
+    # @exception ValueError tried to store non-current version data
+
+    def store(self, oid, version, start_tid, end_tid, data):
+        # It's hard for the client to avoid storing the same object
+        # more than once.  One case is whether the client requests
+        # version data that doesn't exist.  It checks the cache for
+        # the requested version, doesn't find it, then asks the server
+        # for that data.  The server returns the non-version data,
+        # which may already by in the cache.
+        if (oid, start_tid) in self.fc:
+            return
+        o = Object((oid, start_tid), version, data, start_tid, end_tid)
+        if version:
+            if end_tid is not None:
+                raise ValueError("cache only stores current version data")
+            if oid in self.version:
+                if self.version[oid] != (version, start_tid):
+                    raise ValueError("data already exists for version %r"
+                                     % self.version[oid][0])
+            self.version[oid] = version, start_tid
+            self._trace(0x50, oid, version, start_tid, dlen=len(data))
+        else:
+            if end_tid is None:
+                _cur_start = self.current.get(oid)
+                if _cur_start:
+                    if _cur_start != start_tid:
+                        raise ValueError(
+                            "already have current data for oid")
+                    else:
+                        return
+                self.current[oid] = start_tid
+                self._trace(0x52, oid, version, start_tid, dlen=len(data))
+            else:
+                L = self.noncurrent.setdefault(oid, [])
+                p = start_tid, end_tid
+                if p in L:
+                    return # duplicate store
+                bisect.insort_left(L, (start_tid, end_tid))
+                self._trace(0x54, oid, version, start_tid, end_tid,
+                            dlen=len(data))
+        self.fc.add(o)
+
+    ##
+    # Mark the current data for oid as non-current.  If there is no
+    # current data for oid, do nothing.
+    # @param oid object id
+    # @param version name of version to invalidate.
+    # @param tid the id of the transaction that wrote a new revision of oid
+
+    def invalidate(self, oid, version, tid):
+        if tid > self.fc.tid:
+            self.fc.settid(tid)
+        if oid in self.version:
+            self._trace(0x1A, oid, version, tid)
+            dllversion, dlltid = self.version[oid]
+            assert not version or version == dllversion, (version, dllversion)
+            # remove() will call unlink() to delete from self.version
+            self.fc.remove((oid, dlltid))
+            # And continue on, we must also remove any non-version data
+            # from the cache.  This is a bit of a failure of the current
+            # cache consistency approach as the new tid of the version
+            # data gets confused with the old tid of the non-version data.
+            # I could sort this out, but it seems simpler to punt and
+            # have the cache invalidation too much for versions.
+
+        if oid not in self.current:
+            self._trace(0x10, oid, version, tid)
+            return
+        cur_tid = self.current.pop(oid)
+        # XXX Want to fetch object without marking it as accessed
+        o = self.fc.access((oid, cur_tid))
+        if o is None:
+            # XXX is this possible?
+            return None
+        o.end_tid = tid
+        self.fc.update(o)
+        self._trace(0x1C, oid, version, tid)
+        L = self.noncurrent.setdefault(oid, [])
+        bisect.insort_left(L, (cur_tid, tid))
+
+    ##
+    # Return the number of object revisions in the cache.
+
+    # XXX just return len(self.cache)?
+
+    def __len__(self):
+        n = len(self.current) + len(self.version)
+        if self.noncurrent:
+            n += sum(map(len, self.noncurrent))
+        return n
+
+    ##
+    # Generates over, version, serial triples for all objects in the
+    # cache.  This generator is used by cache verification.
+
+    def contents(self):
+        # XXX May need to materialize list instead of iterating,
+        # depends on whether the caller may change the cache.
+        for o in self.fc:
+            oid, tid = o.key
+            if oid in self.version:
+                obj = self.fc.access(o.key)
+                yield oid, tid, obj.version
+            else:
+                yield oid, tid, ""
+
+    def dump(self):
+        from ZODB.utils import oid_repr
+        print "cache size", len(self)
+        L = list(self.contents())
+        L.sort()
+        for oid, tid, version in L:
+            print oid_repr(oid), oid_repr(tid), repr(version)
+        print "dll contents"
+        L = list(self.fc)
+        L.sort(lambda x,y:cmp(x.key, y.key))
+        for x in L:
+            end_tid = x.end_tid or z64
+            print oid_repr(x.key[0]), oid_repr(x.key[1]), oid_repr(end_tid)
+        print
+
+    def _evicted(self, o):
+        # Called by Object o to signal its eviction
+        oid, tid = o.key
+        if o.end_tid is None:
+            if o.version:
+                del self.version[oid]
+            else:
+                del self.current[oid]
+        else:
+            # XXX Although we use bisect to keep the list sorted,
+            # we never expect the list to be very long.  So the
+            # brute force approach should normally be fine.
+            L = self.noncurrent[oid]
+            L.remove((o.start_tid, o.end_tid))
+
+    def _setup_trace(self):
+        tfn = self.path + ".trace"
+        self.tracefile = None
+        try:
+            self.tracefile = open(tfn, "ab")
+            self._trace(0x00)
+        except IOError, msg:
+            self.tracefile = None
+            self.log.warning("Could not write to trace file %s: %s",
+                             tfn, msg)
+
+    def _notrace(self, *arg, **kwargs):
+        pass
+
+    def _trace(self,
+               code, oid="", version="", tid="", end_tid=z64, dlen=0,
+               # The next two are just speed hacks.
+               time_time=time.time, struct_pack=struct.pack):
+        # The code argument is two hex digits; bits 0 and 7 must be zero.
+        # The first hex digit shows the operation, the second the outcome.
+        # If the second digit is in "02468" then it is a 'miss'.
+        # If it is in "ACE" then it is a 'hit'.
+        # This method has been carefully tuned to be as fast as possible.
+        # Note: when tracing is disabled, this method is hidden by a dummy.
+        if version:
+            code |= 0x80
+        encoded = (dlen + 255) & 0x7fffff00 | code
+        if tid is None:
+            tid = z64
+        if end_tid is None:
+            end_tid = z64
+        try:
+            self.tracefile.write(
+                struct_pack(">iiH8s8s",
+                            time_time(),
+                            encoded,
+                            len(oid),
+                            tid, end_tid) + oid)
+        except:
+            print `tid`, `end_tid`
+            raise
+
+##
+# An Object stores the cached data for a single object.
+# <p>
+# The cached data includes the actual object data, the key, and three
+# data fields that describe the validity period and version of the
+# object.  The key contains the oid and a redundant start_tid.  The
+# actual size of an object is variable, depending on the size of the
+# data and whether it is in a version.
+# <p>
+# The serialized format does not include the key, because it is stored
+# in the header used by the cache's storage format.
+
+class Object(object):
+    __slots__ = (# pair, object id, txn id -- something usable as a dict key
+                 # the second part of the part is equal to start_tid below
+                 "key",
+
+                 "start_tid", # string, id of txn that wrote the data
+                 "end_tid", # string, id of txn that wrote next revision
+                            # or None
+                 "version", # string, name of version
+                 "data", # string, the actual data record for the object
+
+                 "size", # total size of serialized object
+                )
+
+    def __init__(self, key, version, data, start_tid, end_tid):
+        self.key = key
+        self.version = version
+        self.data = data
+        self.start_tid = start_tid
+        self.end_tid = end_tid
+        # The size of a the serialized object on disk, include the
+        # 14-byte header, the length of data and version, and a
+        # copy of the 8-byte oid.
+        if data is not None:
+            self.size = 22 + len(data) + len(version)
+
+    # The serialization format uses an end tid of "\0" * 8, the least
+    # 8-byte string, to represent None.  It isn't possible for an
+    # end_tid to be 0, because it must always be strictly greater
+    # than the start_tid.
+
+    fmt = ">8shi"
+
+    def serialize(self, f):
+        # Write standard form of Object to file, f.
+        self.serialize_header(f)
+        f.write(self.data)
+        f.write(struct.pack(">8s", self.key[0]))
+
+    def serialize_header(self, f):
+        s = struct.pack(self.fmt, self.end_tid or "\0" * 8,
+                        len(self.version), len(self.data))
+        f.write(s)
+        f.write(self.version)
+
+    def fromFile(cls, f, key, header_only=False):
+        s = f.read(struct.calcsize(cls.fmt))
+        if not s:
+            return None
+        oid, start_tid = key
+        end_tid, vlen, dlen = struct.unpack(cls.fmt, s)
+        if end_tid == z64:
+            end_tid = None
+        version = f.read(vlen)
+        if vlen != len(version):
+            raise ValueError("corrupted record, version")
+        if header_only:
+            data = None
+        else:
+            data = f.read(dlen)
+            if dlen != len(data):
+                raise ValueError("corrupted record, data")
+            s = f.read(8)
+            if struct.pack(">8s", s) != oid:
+                raise ValueError("corrupted record, oid")
+        return cls((oid, start_tid), version, data, start_tid, end_tid)
+
+    fromFile = classmethod(fromFile)
+
+def sync(f):
+    f.flush()
+    if hasattr(os, 'fsync'):
+        os.fsync(f.fileno())
+
+class Entry(object):
+    __slots__ = (# object key -- something usable as a dict key.
+                 'key',
+
+                 # Offset from start of file to the object's data
+                 # record; this includes all overhead bytes (status
+                 # byte, size bytes, etc).  The size of the data
+                 # record is stored in the file near the start of the
+                 # record, but for efficiency we also keep size in a
+                 # dict (filemap; see later).
+                 'offset',
+                )
+
+    def __init__(self, key=None, offset=None):
+        self.key = key
+        self.offset = offset
+
+
+magic = "ZEC3"
+
+OBJECT_HEADER_SIZE = 1 + 4 + 16
+
+##
+# FileCache stores a cache in a single on-disk file.
+#
+# On-disk cache structure
+#
+# The file begins with a 12-byte header.  The first four bytes are the
+# file's magic number - ZEC3 - indicating zeo cache version 3.  The
+# next eight bytes are the last transaction id.
+#
+# The file is a contiguous sequence of blocks.  All blocks begin with
+# a one-byte status indicator:
+#
+# 'a'
+#       Allocated.  The block holds an object; the next 4 bytes are >I
+#       format total block size.
+#
+# 'f'
+#       Free.  The block is free; the next 4 bytes are >I format total
+#       block size.
+#
+# '1', '2', '3', '4'
+#       The block is free, and consists of 1, 2, 3 or 4 bytes total.
+#
+# 'Z'
+#       File header.  The file starts with a magic number, currently
+#       'ZEC3' and an 8-byte transaction id.
+#
+# "Total" includes the status byte, and size bytes.  There are no
+# empty (size 0) blocks.
+
+
+# XXX This needs a lot more hair.
+# The structure of an allocated block is more complicated:
+#
+#     1 byte allocation status ('a').
+#     4 bytes block size, >I format.
+#     16 bytes oid + tid, string.
+#     size-OBJECT_HEADER_SIZE bytes, the object pickle.
+
+# The cache's currentofs goes around the file, circularly, forever.
+# It's always the starting offset of some block.
+#
+# When a new object is added to the cache, it's stored beginning at
+# currentofs, and currentofs moves just beyond it.  As many contiguous
+# blocks needed to make enough room for the new object are evicted,
+# starting at currentofs.  Exception:  if currentofs is close enough
+# to the end of the file that the new object can't fit in one
+# contiguous chunk, currentofs is reset to 0 first.
+
+# Do all possible to ensure that the bytes we wrote are really on
+# disk.
+
+class FileCache(object):
+    
+    def __init__(self, maxsize, fpath, parent, reuse=True):
+        # Maximum total of object sizes we keep in cache.
+        self.maxsize = maxsize
+        # Current total of object sizes in cache.
+        self.currentsize = 0
+        self.parent = parent
+        self.tid = None
+
+        # Map offset in file to pair (data record size, Entry).
+        # Entry is None iff the block starting at offset is free.
+        # filemap always contains a complete account of what's in the
+        # file -- study method _verify_filemap for executable checking
+        # of the relevant invariants.  An offset is at the start of a
+        # block iff it's a key in filemap.
+        self.filemap = {}
+
+        # Map key to Entry.  There's one entry for each object in the
+        # cache file.  After
+        #     obj = key2entry[key]
+        # then
+        #     obj.key == key
+        # is true.
+        self.key2entry = {}
+
+        # Always the offset into the file of the start of a block.
+        # New and relocated objects are always written starting at
+        # currentofs.
+        self.currentofs = 12
+
+        self.fpath = fpath
+        if not reuse or not fpath or not os.path.exists(fpath):
+            self.new = True
+            if fpath:
+                self.f = file(fpath, 'wb+')
+            else:
+                self.f = tempfile.TemporaryFile()
+            # Make sure the OS really saves enough bytes for the file.
+            self.f.seek(self.maxsize - 1)
+            self.f.write('x')
+            self.f.truncate()
+            # Start with one magic header block
+            self.f.seek(0)
+            self.f.write(magic)
+            self.f.write(z64)
+            # and one free block.
+            self.f.write('f' + struct.pack(">I", self.maxsize - 12))
+            self.sync()
+            self.filemap[12] = self.maxsize - 12, None
+        else:
+            self.new = False
+            self.f = None
+        
+        # Statistics:  _n_adds, _n_added_bytes,
+        #              _n_evicts, _n_evicted_bytes
+        self.clearStats()
+
+    # Scan the current contents of the cache file, calling install
+    # for each object found in the cache.  This method should only
+    # be called once to initialize the cache from disk.
+
+    def scan(self, install):
+        if self.new:
+            return
+        fsize = os.path.getsize(self.fpath)
+        self.f = file(self.fpath, 'rb+')
+        _magic = self.f.read(4)
+        if _magic != magic:
+            raise ValueError("unexpected magic number: %r" % _magic)
+        self.tid = self.f.read(8)
+        # Remember the largest free block.  That seems a
+        # decent place to start currentofs.
+        max_free_size = max_free_offset = 0
+        ofs = 12
+        while ofs < fsize:
+            self.f.seek(ofs)
+            ent = None
+            status = self.f.read(1)
+            if status == 'a':
+                size, rawkey = struct.unpack(">I16s", self.f.read(20))
+                key = rawkey[:8], rawkey[8:]
+                assert key not in self.key2entry
+                self.key2entry[key] = ent = Entry(key, ofs)
+                install(self.f, ent)
+            elif status == 'f':
+                size, = struct.unpack(">I", self.f.read(4))
+            elif status in '1234':
+                size = int(status)
+            else:
+                assert 0, status
+
+            self.filemap[ofs] = size, ent
+            if ent is None and size > max_free_size:
+                max_free_size, max_free_offset = size, ofs
+
+            ofs += size
+
+        assert ofs == fsize
+        if __debug__:
+            self._verify_filemap()
+        self.currentofs = max_free_offset
+
+    def clearStats(self):
+        self._n_adds = self._n_added_bytes = 0
+        self._n_evicts = self._n_evicted_bytes = 0
+        self._n_removes = self._n_removed_bytes = 0
+        self._n_accesses = 0
+
+    def getStats(self):
+        return (self._n_adds, self._n_added_bytes,
+                self._n_evicts, self._n_evicted_bytes,
+                self._n_removes, self._n_removed_bytes,
+                self._n_accesses
+               )
+
+    def __len__(self):
+        return len(self.key2entry)
+
+    def __iter__(self):
+        return self.key2entry.itervalues()
+
+    def __contains__(self, key):
+        return key in self.key2entry
+
+    def sync(self):
+        sync(self.f)
+
+    def close(self):
+        if self.f:
+            self.sync()
+            self.f.close()
+            self.f = None
+
+    # Evict objects as necessary to free up at least nbytes bytes,
+    # starting at currentofs.  If currentofs is closer than nbytes to
+    # the end of the file, currentofs is reset to 0.  The number of
+    # bytes actually freed may be (and probably will be) greater than
+    # nbytes, and is _makeroom's return value.  The file is not
+    # altered by _makeroom.  filemap is updated to reflect the
+    # evictions, and it's the caller's responsibilty both to fiddle
+    # the file, and to update filemap, to account for all the space
+    # freed (starting at currentofs when _makeroom returns, and
+    # spanning the number of bytes retured by _makeroom).
+
+    def _makeroom(self, nbytes):
+        assert 0 < nbytes <= self.maxsize
+        if self.currentofs + nbytes > self.maxsize:
+            self.currentofs = 12
+        ofs = self.currentofs
+        while nbytes > 0:
+            size, e = self.filemap.pop(ofs)
+            if e is not None:
+                self._evictobj(e, size)
+            ofs += size
+            nbytes -= size
+        return ofs - self.currentofs
+
+    # Write Object obj, with data, to file starting at currentofs.
+    # nfreebytes are already available for overwriting, and it's
+    # guranteed that's enough.  obj.offset is changed to reflect the
+    # new data record position, and filemap is updated to match.
+
+    def _writeobj(self, obj, nfreebytes):
+        size = OBJECT_HEADER_SIZE + obj.size
+        assert size <= nfreebytes
+        excess = nfreebytes - size
+        # If there's any excess (which is likely), we need to record a
+        # free block following the end of the data record.  That isn't
+        # expensive -- it's all a contiguous write.
+        if excess == 0:
+            extra = ''
+        elif excess < 5:
+            extra = "01234"[excess]
+        else:
+            extra = 'f' + struct.pack(">I", excess)
+
+        self.f.seek(self.currentofs)
+        self.f.writelines(('a',
+                           struct.pack(">I8s8s", size,
+                                       obj.key[0], obj.key[1])))
+        obj.serialize(self.f)
+        self.f.write(extra)
+        e = Entry(obj.key, self.currentofs)
+        self.key2entry[obj.key] = e
+        self.filemap[self.currentofs] = size, e
+        self.currentofs += size
+        if excess:
+            # We need to record the free block in filemap, but there's
+            # no need to advance currentofs beyond it.  Instead it
+            # gives some breathing room for the next object to get
+            # written.
+            self.filemap[self.currentofs] = excess, None
+
+    def add(self, object):
+        size = OBJECT_HEADER_SIZE + object.size
+        if size > self.maxsize:
+            return
+        assert size <= self.maxsize
+
+        assert object.key not in self.key2entry
+        assert len(object.key[0]) == 8
+        assert len(object.key[1]) == 8
+
+        self._n_adds += 1
+        self._n_added_bytes += size
+
+        available = self._makeroom(size)
+        self._writeobj(object, available)
+
+    def _verify_filemap(self, display=False):
+        a = 12
+        f = self.f
+        while a < self.maxsize:
+            f.seek(a)
+            status = f.read(1)
+            if status in 'af':
+                size, = struct.unpack(">I", f.read(4))
+            else:
+                size = int(status)
+            if display:
+                if a == self.currentofs:
+                    print '*****',
+                print "%c%d" % (status, size),
+            size2, obj = self.filemap[a]
+            assert size == size2
+            assert (obj is not None) == (status == 'a')
+            if obj is not None:
+                assert obj.offset == a
+                assert self.key2entry[obj.key] is obj
+            a += size
+        if display:
+            print
+        assert a == self.maxsize
+
+    def _evictobj(self, e, size):
+        self._n_evicts += 1
+        self._n_evicted_bytes += size
+        # Load the object header into memory so we know how to
+        # update the parent's in-memory data structures.
+        self.f.seek(e.offset + OBJECT_HEADER_SIZE)
+        o = Object.fromFile(self.f, e.key, header_only=True)
+        self.parent._evicted(o)
+
+    ##
+    # Return object for key or None if not in cache.
+
+    def access(self, key):
+        self._n_accesses += 1
+        e = self.key2entry.get(key)
+        if e is None:
+            return None
+        offset = e.offset
+        size, e2 = self.filemap[offset]
+        assert e is e2
+
+        self.f.seek(offset + OBJECT_HEADER_SIZE)
+        return Object.fromFile(self.f, key)
+
+    ##
+    # Remove object for key from cache, if present.
+
+    def remove(self, key):
+        # If an object is being explicitly removed, we need to load
+        # its header into memory and write a free block marker to the
+        # disk where the object was stored.  We need to load the
+        # header to update the in-memory data structures held by
+        # ClientCache.
+        
+        # XXX Or we could just keep the header in memory at all times.
+
+        e = self.key2entry.get(key)
+        if e is None:
+            return
+        offset = e.offset
+        size, e2 = self.filemap[offset]
+        self.f.seek(offset + OBJECT_HEADER_SIZE)
+        o = Object.fromFile(self.f, key, header_only=True)
+        self.f.seek(offset + OBJECT_HEADER_SIZE)
+        self.f.write('f')
+        self.f.flush()
+        self.parent._evicted(o)
+        self.filemap[offset] = size, None
+
+    ##
+    # Update on-disk representation of obj.
+    #
+    # This method should be called when the object header is modified.
+
+    def update(self, obj):
+        
+        e = self.key2entry[obj.key]
+        self.f.seek(e.offset + OBJECT_HEADER_SIZE)
+        obj.serialize_header(self.f)
+        
+    def settid(self, tid):
+        if self.tid is not None:
+            if tid < self.tid:
+                raise ValueError(
+                    "new last tid must be greater that previous one")
+        self.tid = tid
+        self.f.seek(4)
+        self.f.write(tid)
+        self.f.flush()


=== ZODB3/ZEO/stats.py 1.21 => 1.22 ===
--- ZODB3/ZEO/stats.py:1.21	Tue Jun 10 13:08:10 2003
+++ ZODB3/ZEO/stats.py	Wed Dec 24 11:02:03 2003
@@ -128,15 +128,21 @@
 
     # Read file, gathering statistics, and printing each record if verbose
     rt0 = time.time()
+    # bycode -- map code to count of occurrences
     bycode = {}
+    # records -- number of records
     records = 0
+    # version -- number of records with versions
     versions = 0
     t0 = te = None
+    # datarecords -- number of records with dlen set
     datarecords = 0
     datasize = 0L
-    file0 = file1 = 0
+    # oids -- maps oid to number of times it was loaded
     oids = {}
+    # bysize -- maps data size to number of loads
     bysize = {}
+    # bysize -- maps data size to number of writes
     bysizew = {}
     total_loads = 0
     byinterval = {}
@@ -157,12 +163,12 @@
                 if not quiet:
                     print "Skipping 8 bytes at offset", offset-8
                 continue
-            r = f_read(10)
+            r = f_read(18)
             if len(r) < 10:
                 break
             offset += 10
             records += 1
-            oidlen, serial = struct_unpack(">H8s", r)
+            oidlen, start_tid, end_tid = struct_unpack(">H8s8s", r)
             oid = f_read(oidlen)
             if len(oid) != oidlen:
                 break
@@ -187,11 +193,6 @@
             if code & 0x80:
                 version = 'V'
                 versions += 1
-            current = code & 1
-            if current:
-                file1 += 1
-            else:
-                file0 += 1
             code = code & 0x7e
             bycode[code] = bycode.get(code, 0) + 1
             byinterval[code] = byinterval.get(code, 0) + 1
@@ -199,22 +200,23 @@
                 if code & 0x70 == 0x20: # All loads
                     bysize[dlen] = d = bysize.get(dlen) or {}
                     d[oid] = d.get(oid, 0) + 1
-                elif code == 0x3A: # Update
+                elif code & 0x70 == 0x50: # All stores
                     bysizew[dlen] = d = bysizew.get(dlen) or {}
                     d[oid] = d.get(oid, 0) + 1
             if verbose:
-                print "%s %d %02x %s %016x %1s %s" % (
+                print "%s %d %02x %s %016x %016x %1s %s" % (
                     time.ctime(ts)[4:-5],
                     current,
                     code,
                     oid_repr(oid),
-                    U64(serial),
+                    U64(start_tid),
+                    U64(end_tid),
                     version,
                     dlen and str(dlen) or "")
             if code & 0x70 == 0x20:
                 oids[oid] = oids.get(oid, 0) + 1
                 total_loads += 1
-            if code in (0x00, 0x70):
+            if code == 0x00:
                 if not quiet:
                     dumpbyinterval(byinterval, h0, he)
                 byinterval = {}
@@ -222,10 +224,7 @@
                 h0 = he = ts
                 if not quiet:
                     print time.ctime(ts)[4:-5],
-                    if code == 0x00:
-                        print '='*20, "Restart", '='*20
-                    else:
-                        print '-'*20, "Flip->%d" % current, '-'*20
+                    print '='*20, "Restart", '='*20
     except KeyboardInterrupt:
         print "\nInterrupted.  Stats so far:\n"
 
@@ -248,8 +247,6 @@
         print "First time: %s" % time.ctime(t0)
         print "Last time:  %s" % time.ctime(te)
         print "Duration:   %s seconds" % addcommas(te-t0)
-        print "File stats: %s in file 0; %s in file 1" % (
-            addcommas(file0), addcommas(file1))
         print "Data recs:  %s (%.1f%%), average size %.1f KB" % (
             addcommas(datarecords),
             100.0 * datarecords / records,
@@ -314,7 +311,7 @@
         if code & 0x70 == 0x20:
             n = byinterval[code]
             loads += n
-            if code in (0x2A, 0x2C, 0x2E):
+            if code in (0x22, 0x26):
                 hits += n
     if not loads:
         return
@@ -333,7 +330,7 @@
         if code & 0x70 == 0x20:
             n = bycode[code]
             loads += n
-            if code in (0x2A, 0x2C, 0x2E):
+            if code in (0x22, 0x26):
                 hits += n
     if loads:
         return 100.0 * hits / loads
@@ -376,31 +373,18 @@
     0x00: "_setup_trace (initialization)",
 
     0x10: "invalidate (miss)",
-    0x1A: "invalidate (hit, version, writing 'n')",
-    0x1C: "invalidate (hit, writing 'i')",
+    0x1A: "invalidate (hit, version)",
+    0x1C: "invalidate (hit, saving non-current)",
 
     0x20: "load (miss)",
-    0x22: "load (miss, version, status 'n')",
-    0x24: "load (miss, deleting index entry)",
-    0x26: "load (miss, no non-version data)",
-    0x28: "load (miss, version mismatch, no non-version data)",
-    0x2A: "load (hit, returning non-version data)",
-    0x2C: "load (hit, version mismatch, returning non-version data)",
-    0x2E: "load (hit, returning version data)",
-
-    0x3A: "update",
-
-    0x40: "modifiedInVersion (miss)",
-    0x4A: "modifiedInVersion (hit, return None, status 'n')",
-    0x4C: "modifiedInVersion (hit, return '')",
-    0x4E: "modifiedInVersion (hit, return version)",
+    0x22: "load (hit)",
+    0x24: "load (non-current, miss)",
+    0x26: "load (non-current, hit)",
+
+    0x50: "store (version)",
+    0x52: "store (current, non-version)",
+    0x54: "store (non-current)",
 
-    0x5A: "store (non-version data present)",
-    0x5C: "store (only version data present)",
-
-    0x6A: "_copytocurrent",
-
-    0x70: "checkSize (cache flip)",
     }
 
 if __name__ == "__main__":


=== ZODB3/ZEO/StorageServer.py 1.103 => 1.104 ===
--- ZODB3/ZEO/StorageServer.py:1.103	Mon Nov 24 16:27:51 2003
+++ ZODB3/ZEO/StorageServer.py	Wed Dec 24 11:02:03 2003
@@ -235,6 +235,14 @@
     def getExtensionMethods(self):
         return self._extensions
 
+    def loadEx(self, oid, version):
+        self.stats.loads += 1
+        return self.storage.loadEx(oid, version)
+
+    def loadBefore(self, oid, tid):
+        self.stats.loads += 1
+        return self.storage.loadBefore(oid, tid)
+
     def zeoLoad(self, oid):
         self.stats.loads += 1
         v = self.storage.modifiedInVersion(oid)
@@ -260,12 +268,26 @@
                  % (len(invlist), u64(invtid)))
         return invtid, invlist
 
+    def verify(self, oid, version, tid):
+        try:
+            t = self.storage.getTid(oid)
+        except KeyError:
+            self.client.invalidateVerify((oid, ""))
+        else:
+            if tid != t:
+                # This will invalidate non-version data when the
+                # client only has invalid version data.  Since this is
+                # an uncommon case, we avoid the cost of checking
+                # whether the serial number matches the current
+                # non-version data.
+                self.client.invalidateVerify((oid, version))
+
     def zeoVerify(self, oid, s, sv):
         if not self.verifying:
             self.verifying = 1
             self.stats.verifying_clients += 1
         try:
-            os = self.storage.getSerial(oid)
+            os = self.storage.getTid(oid)
         except KeyError:
             self.client.invalidateVerify((oid, ''))
             # XXX It's not clear what we should do now.  The KeyError
@@ -344,7 +366,7 @@
     def undoLog(self, first, last):
         return run_in_thread(self.storage.undoLog, first, last)
 
-    def tpc_begin(self, id, user, description, ext, tid, status):
+    def tpc_begin(self, id, user, description, ext, tid=None, status=" "):
         if self.read_only:
             raise ReadOnlyError()
         if self.transaction is not None:
@@ -521,25 +543,25 @@
         return self.storage.tpc_vote(self.transaction)
 
     def _abortVersion(self, src):
-        oids = self.storage.abortVersion(src, self.transaction)
+        tid, oids = self.storage.abortVersion(src, self.transaction)
         inv = [(oid, src) for oid in oids]
         self.invalidated.extend(inv)
-        return oids
+        return tid, oids
 
     def _commitVersion(self, src, dest):
-        oids = self.storage.commitVersion(src, dest, self.transaction)
+        tid, oids = self.storage.commitVersion(src, dest, self.transaction)
         inv = [(oid, dest) for oid in oids]
         self.invalidated.extend(inv)
         if dest:
             inv = [(oid, src) for oid in oids]
             self.invalidated.extend(inv)
-        return oids
+        return tid, oids
 
     def _transactionalUndo(self, trans_id):
-        oids = self.storage.transactionalUndo(trans_id, self.transaction)
+        tid, oids = self.storage.transactionalUndo(trans_id, self.transaction)
         inv = [(oid, None) for oid in oids]
         self.invalidated.extend(inv)
-        return oids
+        return tid, oids
 
     # When a delayed transaction is restarted, the dance is
     # complicated.  The restart occurs when one ZEOStorage instance
@@ -853,6 +875,9 @@
         if earliest_tid > tid:
             log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
             return None, []
+
+        # XXX this is wrong!  must check against tid or we invalidate
+        # too much.
 
         oids = {}
         for tid, L in self.invq:


=== ZODB3/ZEO/ServerStub.py 1.17 => 1.18 ===
--- ZODB3/ZEO/ServerStub.py:1.17	Thu Oct  2 14:17:22 2003
+++ ZODB3/ZEO/ServerStub.py	Wed Dec 24 11:02:03 2003
@@ -13,6 +13,18 @@
 ##############################################################################
 """RPC stubs for interface exported by StorageServer."""
 
+##
+# ZEO storage server.
+# <p>
+# Remote method calls can be synchronous or asynchronous.  If the call
+# is synchronous, the client thread blocks until the call returns.  A
+# single client can only have one synchronous request outstanding.  If
+# several threads share a single client, threads other than the caller
+# will block only if the attempt to make another synchronous call.
+# An asynchronous call does not cause the client thread to block.  An
+# exception raised by an asynchronous method is logged on the server,
+# but is not returned to the client.
+
 class StorageServer:
 
     """An RPC stub class for the interface exported by ClientStorage.
@@ -43,46 +55,173 @@
     def extensionMethod(self, name):
         return ExtensionMethodWrapper(self.rpc, name).call
 
+    ##
+    # Register current connection with a storage and a mode.
+    # In effect, it is like an open call.
+    # @param storage_name a string naming the storage.  This argument
+    #        is primarily for backwards compatibility with servers
+    #        that supported multiple storages.
+    # @param read_only boolean
+    # @exception ValueError unknown storage_name or already registered
+    # @exception ReadOnlyError storage is read-only and a read-write
+    #            connectio was requested
+
     def register(self, storage_name, read_only):
         self.rpc.call('register', storage_name, read_only)
 
+    ##
+    # Return dictionary of meta-data about the storage.
+    # @defreturn dict
+
     def get_info(self):
         return self.rpc.call('get_info')
 
+    ##
+    # Check whether the server requires authentication.  Returns
+    # the name of the protocol.
+    # @defreturn string
+
     def getAuthProtocol(self):
         return self.rpc.call('getAuthProtocol')
 
+    ##
+    # Return id of the last committed transaction
+    # @defreturn string
+
     def lastTransaction(self):
         # Not in protocol version 2.0.0; see __init__()
         return self.rpc.call('lastTransaction')
 
+    ##
+    # Return invalidations for all transactions after tid.
+    # @param tid transaction id
+    # @defreturn 2-tuple, (tid, list)
+    # @return tuple containing the last committed transaction
+    #         and a list of oids that were invalidated.  Returns
+    #         None and an empty list if the server does not have
+    #         the list of oids available.
+
     def getInvalidations(self, tid):
         # Not in protocol version 2.0.0; see __init__()
         return self.rpc.call('getInvalidations', tid)
 
+    ##
+    # Check whether serial numbers s and sv are current for oid.
+    # If one or both of the serial numbers are not current, the
+    # server will make an asynchronous invalidateVerify() call.
+    # @param oid object id
+    # @param s serial number on non-version data
+    # @param sv serial number of version data or None
+    # @defreturn async
+
     def zeoVerify(self, oid, s, sv):
         self.rpc.callAsync('zeoVerify', oid, s, sv)
 
+    ##
+    # Check whether current serial number is valid for oid and version.
+    # If the serial number is not current, the server will make an
+    # asynchronous invalidateVerify() call.
+    # @param oid object id
+    # @param version name of version for oid
+    # @param serial client's current serial number
+    # @defreturn async
+
+    def verify(self, oid, version, serial):
+        self.rpc.callAsync('verify', oid, version, serial)
+
+    ##
+    # Signal to the server that cache verification is done.
+    # @defreturn async
+
     def endZeoVerify(self):
         self.rpc.callAsync('endZeoVerify')
 
+    ##
+    # Generate a new set of oids.
+    # @param n number of new oids to return
+    # @defreturn list
+    # @return list of oids
+
     def new_oids(self, n=None):
         if n is None:
             return self.rpc.call('new_oids')
         else:
             return self.rpc.call('new_oids', n)
 
+    ##
+    # Pack the storage.
+    # @param t pack time
+    # @param wait optional, boolean.  If true, the call will not
+    #             return until the pack is complete.
+
     def pack(self, t, wait=None):
         if wait is None:
             self.rpc.call('pack', t)
         else:
             self.rpc.call('pack', t, wait)
 
+    ##
+    # Return current data for oid.  Version data is returned if
+    # present.
+    # @param oid object id
+    # @defreturn 5-tuple
+    # @return 5-tuple, current non-version data, serial number,
+    #         version name, version data, version data serial number
+    # @exception KeyError if oid is not found
+
     def zeoLoad(self, oid):
         return self.rpc.call('zeoLoad', oid)
 
+    ##
+    # Return current data for oid along with tid if transaction that
+    # wrote the date.
+    # @param oid object id
+    # @param version string, name of version
+    # @defreturn 4-tuple
+    # @return data, serial number, transaction id, version,
+    #         where version is the name of the version the data came
+    #         from or "" for non-version data
+    # @exception KeyError if oid is not found
+
+    def loadEx(self, oid, version):
+        return self.rpc.call("loadEx", oid, version)
+
+    ##
+    # Return non-current data along with transaction ids that identify
+    # the lifetime of the specific revision.
+    # @param oid object id
+    # @param tid a transaction id that provides an upper bound on
+    #            the lifetime of the revision.  That is, loadBefore
+    #            returns the revision that was current before tid committed.
+    # @defreturn 4-tuple
+    # @return data, serial numbr, start transaction id, end transaction id
+
+    def loadBefore(self, oid, tid):
+        return self.rpc.call("loadBefore", oid, tid)
+
+    ##
+    # Storage new revision of oid.
+    # @param oid object id
+    # @param serial serial number that this transaction read
+    # @param data new data record for oid
+    # @param version name of version or ""
+    # @param id id of current transaction
+    # @defreturn async
+
     def storea(self, oid, serial, data, version, id):
         self.rpc.callAsync('storea', oid, serial, data, version, id)
+
+    ##
+    # Start two-phase commit for a transaction
+    # @param id id used by client to identify current transaction.  The
+    #        only purpose of this argument is to distinguish among multiple
+    #        threads using a single ClientStorage.
+    # @param user name of user committing transaction (can be "")
+    # @param description string containing transaction metadata (can be "")
+    # @param ext dictionary of extended metadata (?)
+    # @param tid optional explicit tid to pass to underlying storage
+    # @param status optional status character, e.g "p" for pack
+    # @defreturn async
 
     def tpc_begin(self, id, user, descr, ext, tid, status):
         return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)


=== ZODB3/ZEO/ClientStorage.py 1.112 => 1.113 ===
--- ZODB3/ZEO/ClientStorage.py:1.112	Fri Nov 28 11:44:47 2003
+++ ZODB3/ZEO/ClientStorage.py	Wed Dec 24 11:02:03 2003
@@ -26,7 +26,8 @@
 import time
 import types
 
-from ZEO import ClientCache, ServerStub
+from ZEO import ServerStub
+from ZEO.cache import ClientCache
 from ZEO.TransactionBuffer import TransactionBuffer
 from ZEO.Exceptions import ClientStorageError, UnrecognizedResult, \
      ClientDisconnected, AuthError
@@ -91,7 +92,7 @@
     # Classes we instantiate.  A subclass might override.
 
     TransactionBufferClass = TransactionBuffer
-    ClientCacheClass = ClientCache.ClientCache
+    ClientCacheClass = ClientCache
     ConnectionManagerClass = ConnectionManager
     StorageServerStubClass = ServerStub.StorageServer
 
@@ -252,10 +253,17 @@
 
         self._tbuf = self.TransactionBufferClass()
         self._db = None
+        self._ltid = None # the last committed transaction
 
         # _serials: stores (oid, serialno) as returned by server
         # _seriald: _check_serials() moves from _serials to _seriald,
         #           which maps oid to serialno
+
+        # XXX If serial number matches transaction id, then there is
+        # no need to have all this extra infrastructure for handling
+        # serial numbers.  The vote call can just return the tid.
+        # If there is a conflict error, we can't have a special method
+        # called just to propagate the error.
         self._serials = []
         self._seriald = {}
 
@@ -292,13 +300,15 @@
         # is executing.
         self._lock = threading.Lock()
 
-        t = self._ts = get_timestamp()
-        self._serial = `t`
-        self._oid = '\0\0\0\0\0\0\0\0'
-
         # Decide whether to use non-temporary files
-        self._cache = self.ClientCacheClass(storage, cache_size,
-                                            client=client, var=var)
+        if client is not None:
+            dir = var or os.getcwd()
+            cache_path = os.path.join(dir, "%s-%s.zec" % (client, storage))
+        else:
+            cache_path = None
+        self._cache = self.ClientCacheClass(cache_path)
+        # XXX When should it be opened?
+        self._cache.open()
 
         self._rpc_mgr = self.ConnectionManagerClass(addr, self,
                                                     tmin=min_disconnect_poll,
@@ -312,9 +322,6 @@
             # doesn't succeed, call connect() to start a thread.
             if not self._rpc_mgr.attempt_connect():
                 self._rpc_mgr.connect()
-            # If the connect hasn't occurred, run with cached data.
-            if not self._ready.isSet():
-                self._cache.open()
 
     def _wait(self, timeout=None):
         if timeout is not None:
@@ -555,7 +562,6 @@
             if ltid == last_inval_tid:
                 log2(INFO, "No verification necessary "
                      "(last_inval_tid up-to-date)")
-                self._cache.open()
                 self._server = server
                 self._ready.set()
                 return "no verification"
@@ -569,7 +575,6 @@
             pair = server.getInvalidations(last_inval_tid)
             if pair is not None:
                 log2(INFO, "Recovering %d invalidations" % len(pair[1]))
-                self._cache.open()
                 self.invalidateTransaction(*pair)
                 self._server = server
                 self._ready.set()
@@ -581,7 +586,9 @@
         self._pickler = cPickle.Pickler(self._tfile, 1)
         self._pickler.fast = 1 # Don't use the memo
 
-        self._cache.verify(server.zeoVerify)
+        # XXX should batch these operations for efficiency
+        for oid, tid, version in self._cache.contents():
+            server.verify(oid, version, tid)
         self._pending_server = server
         server.endZeoVerify()
         return "full verification"
@@ -600,8 +607,7 @@
         This is called by ConnectionManager when the connection is
         closed or when certain problems with the connection occur.
         """
-        log2(PROBLEM, "Disconnected from storage: %s"
-             % repr(self._server_addr))
+        log2(INFO, "Disconnected from storage: %s" % repr(self._server_addr))
         self._connection = None
         self._ready.clear()
         self._server = disconnected_stub
@@ -671,10 +677,10 @@
             raise POSException.StorageTransactionError(self._transaction,
                                                        trans)
 
-    def abortVersion(self, version, transaction):
+    def abortVersion(self, version, txn):
         """Storage API: clear any changes made by the given version."""
-        self._check_trans(transaction)
-        oids = self._server.abortVersion(version, self._serial)
+        self._check_trans(txn)
+        tid, oids = self._server.abortVersion(version, id(txn))
         # When a version aborts, invalidate the version and
         # non-version data.  The non-version data should still be
         # valid, but older versions of ZODB will change the
@@ -686,28 +692,31 @@
         # we could just invalidate the version data.
         for oid in oids:
             self._tbuf.invalidate(oid, '')
-        return oids
+        return tid, oids
 
-    def commitVersion(self, source, destination, transaction):
+    def commitVersion(self, source, destination, txn):
         """Storage API: commit the source version in the destination."""
-        self._check_trans(transaction)
-        oids = self._server.commitVersion(source, destination, self._serial)
+        self._check_trans(txn)
+        tid, oids = self._server.commitVersion(source, destination, id(txn))
         if destination:
             # just invalidate our version data
             for oid in oids:
                 self._tbuf.invalidate(oid, source)
         else:
-            # destination is '', so invalidate version and non-version
+            # destination is "", so invalidate version and non-version
             for oid in oids:
-                self._tbuf.invalidate(oid, destination)
-        return oids
+                self._tbuf.invalidate(oid, "")
+        return tid, oids
 
-    def history(self, oid, version, length=1):
+    def history(self, oid, version, length=1, filter=None):
         """Storage API: return a sequence of HistoryEntry objects.
 
         This does not support the optional filter argument defined by
         the Storage API.
         """
+        if filter is not None:
+            log2(WARNING, "filter argument to history() ignored")
+        # XXX should I run filter on the results?
         return self._server.history(oid, version, length)
 
     def getSerial(self, oid):
@@ -725,11 +734,14 @@
         specified by the given object id and version, if they exist;
         otherwise a KeyError is raised.
         """
+        return self.loadEx(oid, version)[:2]
+
+    def loadEx(self, oid, version):
         self._lock.acquire()    # for atomic processing of invalidations
         try:
-            pair = self._cache.load(oid, version)
-            if pair:
-                return pair
+            t = self._cache.load(oid, version)
+            if t:
+                return t
         finally:
             self._lock.release()
 
@@ -745,25 +757,55 @@
             finally:
                 self._lock.release()
 
-            p, s, v, pv, sv = self._server.zeoLoad(oid)
+            data, tid, ver = self._server.loadEx(oid, version)
 
             self._lock.acquire()    # for atomic processing of invalidations
             try:
                 if self._load_status:
-                    self._cache.checkSize(0)
-                    self._cache.store(oid, p, s, v, pv, sv)
+                    self._cache.store(oid, ver, tid, None, data)
                 self._load_oid = None
             finally:
                 self._lock.release()
         finally:
             self._load_lock.release()
 
-        if v and version and v == version:
-            return pv, sv
-        else:
-            if s:
-                return p, s
-            raise KeyError, oid # no non-version data for this
+        return data, tid, ver
+
+    def loadBefore(self, oid, tid):
+        self._lock.acquire()
+        try:
+            t = self._cache.loadBefore(oid, tid)
+            if t is not None:
+                return t
+        finally:
+            self._lock.release()
+
+        t = self._server.loadBefore(oid, tid)
+        if t is None:
+            return None
+        data, start, end = t
+        if end is None:
+            # This method should not be used to get current data.  It
+            # doesn't use the _load_lock, so it is possble to overlap
+            # this load with an invalidation for the same object.
+
+            # XXX If we call again, we're guaranteed to get the
+            # post-invalidation data.  But if the data is still
+            # current, we'll still get end == None.
+
+            # Maybe the best thing to do is to re-run the test with
+            # the load lock in the case.  That's slow performance, but
+            # I don't think real application code will ever care about
+            # it.
+
+            return data, start, end
+        self._lock.acquire()
+        try:
+            self._cache.store(oid, "", start, end, data)
+        finally:
+            self._lock.release()
+
+        return data, start, end
 
     def modifiedInVersion(self, oid):
         """Storage API: return the version, if any, that modfied an object.
@@ -815,6 +857,8 @@
 
     def _check_serials(self):
         """Internal helper to move data from _serials to _seriald."""
+        # XXX serials are always going to be the same, the only
+        # question is whether an exception has been raised.
         if self._serials:
             l = len(self._serials)
             r = self._serials[:l]
@@ -825,18 +869,18 @@
                 self._seriald[oid] = s
             return r
 
-    def store(self, oid, serial, data, version, transaction):
+    def store(self, oid, serial, data, version, txn):
         """Storage API: store data for an object."""
-        self._check_trans(transaction)
-        self._server.storea(oid, serial, data, version, self._serial)
+        self._check_trans(txn)
+        self._server.storea(oid, serial, data, version, id(txn))
         self._tbuf.store(oid, version, data)
         return self._check_serials()
 
-    def tpc_vote(self, transaction):
+    def tpc_vote(self, txn):
         """Storage API: vote on a transaction."""
-        if transaction is not self._transaction:
+        if txn is not self._transaction:
             return
-        self._server.vote(self._serial)
+        self._server.vote(id(txn))
         return self._check_serials()
 
     def tpc_begin(self, txn, tid=None, status=' '):
@@ -856,15 +900,8 @@
         self._transaction = txn
         self._tpc_cond.release()
 
-        if tid is None:
-            self._ts = get_timestamp(self._ts)
-            id = `self._ts`
-        else:
-            self._ts = TimeStamp(tid)
-            id = tid
-
         try:
-            self._server.tpc_begin(id, txn.user, txn.description,
+            self._server.tpc_begin(id(txn), txn.user, txn.description,
                                    txn._extension, tid, status)
         except:
             # Client may have disconnected during the tpc_begin().
@@ -872,7 +909,6 @@
                 self.end_transaction()
             raise
 
-        self._serial = id
         self._tbuf.clear()
         self._seriald.clear()
         del self._serials[:]
@@ -881,18 +917,17 @@
         """Internal helper to end a transaction."""
         # the right way to set self._transaction to None
         # calls notify() on _tpc_cond in case there are waiting threads
-        self._ltid = self._serial
         self._tpc_cond.acquire()
         self._transaction = None
         self._tpc_cond.notify()
         self._tpc_cond.release()
 
     def lastTransaction(self):
-        return self._ltid
+        return self._cache.getLastTid()
 
-    def tpc_abort(self, transaction):
+    def tpc_abort(self, txn):
         """Storage API: abort a transaction."""
-        if transaction is not self._transaction:
+        if txn is not self._transaction:
             return
         try:
             # XXX Are there any transactions that should prevent an
@@ -900,7 +935,7 @@
             # all, yet you want to be sure that other abort logic is
             # executed regardless.
             try:
-                self._server.tpc_abort(self._serial)
+                self._server.tpc_abort(id(txn))
             except ClientDisconnected:
                 log2(BLATHER, 'ClientDisconnected in tpc_abort() ignored')
         finally:
@@ -909,9 +944,9 @@
             del self._serials[:]
             self.end_transaction()
 
-    def tpc_finish(self, transaction, f=None):
+    def tpc_finish(self, txn, f=None):
         """Storage API: finish a transaction."""
-        if transaction is not self._transaction:
+        if txn is not self._transaction:
             return
         self._load_lock.acquire()
         try:
@@ -919,15 +954,16 @@
                 raise ClientDisconnected(
                        'Calling tpc_finish() on a disconnected transaction')
 
-            tid = self._server.tpc_finish(self._serial)
+            tid = self._server.tpc_finish(id(txn))
 
             self._lock.acquire()  # for atomic processing of invalidations
             try:
-                self._update_cache()
+                self._update_cache(tid)
                 if f is not None:
-                    f()
+                    f(tid)
             finally:
                 self._lock.release()
+            # XXX Shouldn't this cache call be made while holding the lock?
             self._cache.setLastTid(tid)
 
             r = self._check_serials()
@@ -936,7 +972,7 @@
             self._load_lock.release()
             self.end_transaction()
 
-    def _update_cache(self):
+    def _update_cache(self, tid):
         """Internal helper to handle objects modified by a transaction.
 
         This iterates over the objects in the transaction buffer and
@@ -949,7 +985,6 @@
         if self._cache is None:
             return
 
-        self._cache.checkSize(self._tbuf.get_size())
         try:
             self._tbuf.begin_iterate()
         except ValueError, msg:
@@ -965,18 +1000,17 @@
                     "client storage: %s" % msg)
             if t is None:
                 break
-            oid, v, p = t
-            if p is None: # an invalidation
-                s = None
-            else:
+            oid, version, data = t
+            self._cache.invalidate(oid, version, tid)
+            # If data is None, we just invalidate.
+            if data is not None:
                 s = self._seriald[oid]
-            if s == ResolvedSerial or s is None:
-                self._cache.invalidate(oid, v)
-            else:
-                self._cache.update(oid, s, v, p)
+                if s != ResolvedSerial:
+                    assert s == tid, (s, tid)
+                    self._cache.store(oid, version, s, None, data)
         self._tbuf.clear()
 
-    def transactionalUndo(self, trans_id, trans):
+    def transactionalUndo(self, trans_id, txn):
         """Storage API: undo a transaction.
 
         This is executed in a transactional context.  It has no effect
@@ -985,24 +1019,11 @@
         Zope uses this to implement undo unless it is not supported by
         a storage.
         """
-        self._check_trans(trans)
-        oids = self._server.transactionalUndo(trans_id, self._serial)
+        self._check_trans(txn)
+        tid, oids = self._server.transactionalUndo(trans_id, id(txn))
         for oid in oids:
             self._tbuf.invalidate(oid, '')
-        return oids
-
-    def undo(self, transaction_id):
-        """Storage API: undo a transaction, writing directly to the storage."""
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-        oids = self._server.undo(transaction_id)
-        self._lock.acquire()
-        try:
-            for oid in oids:
-                self._cache.invalidate(oid, '')
-        finally:
-            self._lock.release()
-        return oids
+        return tid, oids
 
     def undoInfo(self, first=0, last=-20, specification=None):
         """Storage API: return undo information."""
@@ -1059,15 +1080,15 @@
         try:
             # versions maps version names to dictionary of invalidations
             versions = {}
-            for oid, version in invs:
+            for oid, version, tid in invs:
                 if oid == self._load_oid:
                     self._load_status = 0
-                self._cache.invalidate(oid, version=version)
-                versions.setdefault(version, {})[oid] = 1
+                self._cache.invalidate(oid, version, tid)
+                versions.setdefault((version, tid), {})[oid] = tid
 
             if self._db is not None:
-                for v, d in versions.items():
-                    self._db.invalidate(d, version=v)
+                for (version, tid), d in versions.items():
+                    self._db.invalidate(tid, d, version=version)
         finally:
             self._lock.release()
 
@@ -1099,7 +1120,8 @@
             for t in args:
                 self._pickler.dump(t)
             return
-        self._process_invalidations(args)
+        self._process_invalidations([(oid, version, tid)
+                                     for oid, version in args])
 
     # The following are for compatibility with protocol version 2.0.0
 
@@ -1110,36 +1132,10 @@
     end = endVerify
     Invalidate = invalidateTrans
 
-try:
-    StopIteration
-except NameError:
-    class StopIteration(Exception):
-        pass
-
-class InvalidationLogIterator:
-    """Helper class for reading invalidations in endVerify."""
-
-    def __init__(self, fileobj):
-        self._unpickler = cPickle.Unpickler(fileobj)
-        self.getitem_i = 0
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        oid, version = self._unpickler.load()
+def InvalidationLogIterator(fileobj):
+    unpickler = cPickle.Unpickler(fileobj)
+    while 1:
+        oid, version = unpickler.load()
         if oid is None:
-            raise StopIteration
-        return oid, version
-
-    # The __getitem__() method is needed to support iteration
-    # in Python 2.1.
-
-    def __getitem__(self, i):
-        assert i == self.getitem_i
-        try:
-            obj = self.next()
-        except StopIteration:
-            raise IndexError, i
-        self.getitem_i += 1
-        return obj
+            break
+        yield oid, version, None

=== Removed File ZODB3/ZEO/ICache.py ===

=== Removed File ZODB3/ZEO/ClientCache.py ===




More information about the Zodb-checkins mailing list