[Checkins] SVN: relstorage/trunk/ The memcache integration now uses checkpoints, which

Shane Hathaway shane at hathawaymix.org
Sat Oct 17 19:19:15 EDT 2009


Log message for revision 105119:
  The memcache integration now uses checkpoints, which
  will theoretically yield a high cache hit rate even when
  committing many transactions.
  
  Also added process-wide caching similar to a ZEO cache.
  The new cache has the same interface as memcache.
  
  Now we have a multi-level cache architecture:
  
    - L1 (cPickleCache)
    - L2 (process-wide cache)
    - L3 (memcached)
  
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/adapters/interfaces.py
  U   relstorage/trunk/relstorage/adapters/poller.py
  U   relstorage/trunk/relstorage/cache.py
  U   relstorage/trunk/relstorage/options.py
  U   relstorage/trunk/relstorage/storage.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py
  U   relstorage/trunk/relstorage/tests/test_cache.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/CHANGES.txt	2009-10-17 23:19:15 UTC (rev 105119)
@@ -14,6 +14,9 @@
 - Revised the way RelStorage uses memcached.  Minimized the number of
   trips to both the cache server and the database.
 
+- Added an in-process pickle cache that serves a function similar to the
+  ZEO cache.
+
 - Added a wrapper module for pylibmc.
 
 - Store operations now use multi-insert and multi-delete SQL

Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py	2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/adapters/interfaces.py	2009-10-17 23:19:15 UTC (rev 105119)
@@ -340,10 +340,20 @@
         of the exceptions listed in the disconnected_exceptions
         attribute of the associated IConnectionManager.
 
-        Returns (changed_oids, new_polled_tid).
+        Returns (changes, new_polled_tid), where changes is either
+        a list of (oid, tid) that have changed, or None to indicate
+        that the changes are too complex to list.  new_polled_tid is
+        never None.
         """
 
+    def list_changes(cursor, after_tid, last_tid):
+        """Return the (oid, tid) values changed in a range of transactions.
 
+        The returned iterable must include all changes in the range
+        after_tid < tid <= last_tid.
+        """
+
+
 class ISchemaInstaller(Interface):
     """Install the schema in the database, clear it, or uninstall it"""
 

Modified: relstorage/trunk/relstorage/adapters/poller.py
===================================================================
--- relstorage/trunk/relstorage/adapters/poller.py	2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/adapters/poller.py	2009-10-17 23:19:15 UTC (rev 105119)
@@ -35,11 +35,15 @@
         committed in that transaction will not be included in the list
         of changed OIDs.
 
-        Returns (changed_oids, new_polled_tid).
+        Returns (changes, new_polled_tid), where changes is either
+        a list of (oid, tid) that have changed, or None to indicate
+        that the changes are too complex to list.  new_polled_tid is
+        never None.
         """
         # find out the tid of the most recent transaction.
         cursor.execute(self.poll_query)
         new_polled_tid = cursor.fetchone()[0]
+        assert new_polled_tid is not None
 
         if prev_polled_tid is None:
             # This is the first time the connection has polled.
@@ -72,13 +76,13 @@
         if new_polled_tid > prev_polled_tid:
             if self.keep_history:
                 stmt = """
-                SELECT zoid
+                SELECT zoid, tid
                 FROM current_object
                 WHERE tid > %(tid)s
                 """
             else:
                 stmt = """
-                SELECT zoid
+                SELECT zoid, tid
                 FROM object_state
                 WHERE tid > %(tid)s
                 """
@@ -89,9 +93,9 @@
             stmt = intern(stmt % self.runner.script_vars)
 
             cursor.execute(stmt, params)
-            oids = [oid for (oid,) in cursor]
+            changes = list(cursor)
 
-            return oids, new_polled_tid
+            return changes, new_polled_tid
 
         else:
             # We moved backward in time. This can happen after failover
@@ -110,3 +114,28 @@
             # invalidating the whole cache is simpler.
             return None, new_polled_tid
 
+    def list_changes(self, cursor, after_tid, last_tid):
+        """Return the (oid, tid) values changed in a range of transactions.
+
+        The returned iterable must include the latest changes in the range
+        after_tid < tid <= last_tid.
+        """
+        if self.keep_history:
+            stmt = """
+            SELECT zoid, tid
+            FROM current_object
+            WHERE tid > %(min_tid)s
+                AND tid <= %(max_tid)s
+            """
+        else:
+            stmt = """
+            SELECT zoid, tid
+            FROM object_state
+            WHERE tid > %(min_tid)s
+                AND tid <= %(max_tid)s
+            """
+        params = {'min_tid': after_tid, 'max_tid': last_tid}
+        stmt = intern(stmt % self.runner.script_vars)
+
+        cursor.execute(stmt, params)
+        return list(cursor)

Modified: relstorage/trunk/relstorage/cache.py
===================================================================
--- relstorage/trunk/relstorage/cache.py	2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/cache.py	2009-10-17 23:19:15 UTC (rev 105119)
@@ -15,92 +15,193 @@
 from relstorage.autotemp import AutoTemporaryFile
 from ZODB.utils import p64
 from ZODB.utils import u64
-import time
+import random
+import threading
 
 
 class StorageCache(object):
     """RelStorage integration with memcached or similar.
+
+    Holds a list of memcache clients in order from most local to
+    most global.  The first is a LocalClient, which stores the cache
+    in process but shares the cache between threads.
     """
 
-    # send_limit: max approx. bytes to buffer before sending to the cache
+    # send_limit: approximate limit on the bytes to buffer before
+    # sending to the cache.
     send_limit = 1024 * 1024
 
-    def __init__(self, options):
-        module_name = options.cache_module_name
-        module = __import__(module_name, {}, {}, ['Client'])
-        servers = options.cache_servers
-        if isinstance(servers, basestring):
-            servers = servers.split()
-        self.client = module.Client(servers)
+    # queue is an AutoTemporaryFile during transaction commit.
+    queue = None
+
+    # queue_contents is a map of {oid_int: (startpos, endpos)}
+    # during transaction commit.
+    queue_contents = None
+
+    # checkpoints, when set, is a tuple containing the integer
+    # transaction ID of the two current checkpoints. checkpoint0 is
+    # greater than or equal to checkpoint1.
+    checkpoints = None
+
+    # current_tid contains the last polled transaction ID.  Invariant:
+    # when self.checkpoints is not None, self.delta_after0 has info
+    # from all transactions in the range:
+    #   self.checkpoints[0] < tid <= self.current_tid
+    current_tid = 0
+
+    # commit_count contains the last polled value of the
+    # :commits cache key.  The most global client currently
+    # responding stores the value.
+    commit_count = object()
+
+    def __init__(self, adapter, options, local_client=None):
+        self.adapter = adapter
+        self.options = options
+        if local_client is None:
+            local_client = LocalClient(options)
+        self.clients_local_first = [local_client]
+
+        if options.cache_servers:
+            module_name = options.cache_module_name
+            module = __import__(module_name, {}, {}, ['Client'])
+            servers = options.cache_servers
+            if isinstance(servers, basestring):
+                servers = servers.split()
+            self.clients_local_first.append(module.Client(servers))
+
+        # self.clients_local_first is in order from local to global caches,
+        # while self.clients_global_first is in order from global to local.
+        self.clients_global_first = list(self.clients_local_first)
+        self.clients_global_first.reverse()
+
+        # every cache key has a prefix
         self.prefix = options.cache_prefix or ''
 
-        # queue is an AutoTemporaryFile during txn commit.
-        self.queue = None
+        # commit_count_key contains a number that is incremented
+        # for every commit.  See tpc_finish().
+        self.commit_count_key = '%s:commits' % self.prefix
 
-        # queue_contents is a map of {oid: (startpos, endpos)}
-        # during txn commit.
-        self.queue_contents = None
+        # checkpoints_key holds the current checkpoints.
+        self.checkpoints_key = '%s:checkpoints' % self.prefix
 
-        # commit_count_key is the cache key to poll for changes
-        self.commit_count_key = '%s:commit_count' % self.prefix
+        # delta_after0 contains {oid: tid} after checkpoint 0
+        # and before or at self.current_tid.
+        self.delta_after0 = {}
 
-        # polled_commit_count contains the last polled value of the
-        # 'commit_count' cache key
-        self.polled_commit_count = 0
+        # delta_after1 contains {oid: tid} after checkpoint 1 and
+        # before or at checkpoint 0. The content of delta_after1 only
+        # changes when checkpoints move.
+        self.delta_after1 = {}
 
+        # delta_size_limit places an approximate limit on the number of
+        # entries in the delta_after maps.
+        self.delta_size_limit = options.cache_delta_size_limit
+
+    def new_instance(self):
+        """Return a copy of this instance sharing the same local client"""
+        local_client = self.clients_local_first[0]
+        return StorageCache(self.adapter, self.options, local_client)
+
     def flush_all(self):
         """Remove all data from the cache.  Called by RelStorage.zap_all()"""
-        self.client.flush_all()
+        for client in self.clients_local_first:
+            client.flush_all()
+        self.checkpoints = None
+        self.delta_after0 = {}
+        self.delta_after1 = {}
 
-    def load(self, cursor, oid_int, prev_polled_tid, adapter):
+    def load(self, cursor, oid_int):
         """Load the given object from cache if possible.
 
         Fall back to loading from the database.
         """
-        client = self.client
-        state_key = '%s:state:%d' % (self.prefix, oid_int)
-        if prev_polled_tid:
-            backptr_key = '%s:back:%d:%d' % (
-                self.prefix, prev_polled_tid, oid_int)
-            v = client.get_multi([state_key, backptr_key])
-            if v is not None:
-                cache_data = v.get(state_key)
-                backptr = v.get(backptr_key)
-            else:
-                cache_data = None
-                backptr = None
-        else:
-            cache_data = client.get(state_key)
-            backptr = None
+        if not self.checkpoints:
+            # No poll has occurred yet.  For safety, don't use the cache.
+            return self.adapter.mover.load_current(cursor, oid_int)
 
-        state = None
-        if cache_data and len(cache_data) >= 8:
-            # validate the cache result
-            tid = cache_data[:8]
-            tid_int = u64(tid)
-            if tid_int == prev_polled_tid or tid == backptr:
-                # the cached data is current.
-                state = cache_data[8:]
+        prefix = self.prefix
 
-        if state is None:
-            # could not load from cache, so get from the database
-            state, tid_int = adapter.mover.load_current(
+        # Get the object from the transaction specified
+        # by the following values, in order:
+        #
+        #   1. delta_after0[oid_int]
+        #   2. checkpoints[0]
+        #   3. delta_after1[oid_int]
+        #   4. checkpoints[1]
+        #   5. The database.
+        #
+        # checkpoints[0] is the preferred location.
+        #
+        # If delta_after0 contains oid_int, we should not look at any
+        # other cache keys, since the tid_int specified in delta_after0
+        # replaces all older transaction IDs. Similarly, if
+        # delta_after1 contains oid_int, we should not look at
+        # checkpoints[1]. Also, when both checkpoints are set to the
+        # same transaction ID, we don't need to ask for the same key
+        # twice.
+
+        tid_int = self.delta_after0.get(oid_int)
+        if tid_int:
+            # This object changed after checkpoint0, so
+            # there is only one place to look for its state.
+            cachekey = '%s:state:%d:%d' % (prefix, tid_int, oid_int)
+            for client in self.clients_local_first:
+                cache_data = client.get(cachekey)
+                if cache_data and len(cache_data) >= 8:
+                    # cache hit
+                    assert cache_data[:8] == p64(tid_int)
+                    return cache_data[8:], tid_int
+            # cache miss
+            state, actual_tid_int = self.adapter.mover.load_current(
                 cursor, oid_int)
-            state = str(state or '')
-            if tid_int is not None:
-                # cache the result
-                to_cache = {}
-                tid = p64(tid_int)
-                new_cache_data = tid + state
-                if new_cache_data != cache_data:
-                    to_cache[state_key] = new_cache_data
-                if prev_polled_tid and prev_polled_tid != tid_int:
-                    to_cache[backptr_key] = tid
-                if to_cache:
-                    client.set_multi(to_cache)
+            assert actual_tid_int == tid_int
+            for client in self.clients_local_first:
+                client.set(cachekey, '%s%s' % (p64(tid_int), state or ''))
+            return state, tid_int
 
+        # Make a list of cache keys to query. The list will have either
+        # 1 or 2 keys.
+        cp0, cp1 = self.checkpoints
+        cachekeys = []
+        cp0_key = '%s:state:%d:%d' % (prefix, cp0, oid_int)
+        cachekeys.append(cp0_key)
+        da1_key = None
+        cp1_key = None
+        tid_int = self.delta_after1.get(oid_int)
+        if tid_int:
+            da1_key = '%s:state:%d:%d' % (prefix, tid_int, oid_int)
+            cachekeys.append(da1_key)
+        elif cp1 != cp0:
+            cp1_key = '%s:state:%d:%d' % (prefix, cp1, oid_int)
+            cachekeys.append(cp1_key)
+
+        for client in self.clients_local_first:
+            # Query the cache. Query multiple keys simultaneously to
+            # minimize latency.
+            response = client.get_multi(cachekeys)
+            if response:
+                cache_data = response.get(cp0_key)
+                if cache_data and len(cache_data) >= 8:
+                    # cache hit on the preferred cache key
+                    return cache_data[8:], u64(cache_data[:8])
+
+                if da1_key:
+                    cache_data = response.get(da1_key)
+                elif cp1_key:
+                    cache_data = response.get(cp1_key)
+                if cache_data and len(cache_data) >= 8:
+                    # cache hit, but copy the state to
+                    # the currently preferred key.
+                    client.set(cp0_key, cache_data)
+                    return cache_data[8:], u64(cache_data[:8])
+
+        # cache miss
+        state, tid_int = self.adapter.mover.load_current(cursor, oid_int)
+        if tid_int:
+            client.set(cp0_key, '%s%s' % (p64(tid_int), state or ''))
         return state, tid_int
 
+
     def tpc_begin(self):
         """Prepare temp space for objects to cache."""
         self.queue = AutoTemporaryFile()
@@ -120,14 +221,15 @@
         endpos = queue.tell()
         self.queue_contents[oid_int] = (startpos, endpos)
 
-    def tpc_vote(self, tid):
-        """Now that the tid is chosen, send queued objects to the cache.
-        """
-        client = self.client
-        assert len(tid) == 8
+    def send_queue(self, tid):
+        """Now that this tid is known, send all queued objects to the cache"""
+        tid_int = u64(tid)
         send_size = 0
         to_send = {}
+        prefix = self.prefix
 
+        # Order the queue by file position, which should help if the
+        # file is large and needs to be read sequentially from disk.
         items = [
             (startpos, endpos, oid_int)
             for (oid_int, (startpos, endpos)) in self.queue_contents.items()
@@ -140,35 +242,70 @@
             state = self.queue.read(length)
             if len(state) != length:
                 raise AssertionError("Queued cache data is truncated")
-            cachekey = '%s:state:%d' % (self.prefix, oid_int)
-            to_send[cachekey] = '%s%s' % (tid, state)
-            send_size += length + len(cachekey)
-            if send_size >= self.send_limit:
-                client.set_multi(to_send)
+            cachekey = '%s:state:%d:%d' % (prefix, tid_int, oid_int)
+            item_size = length + len(cachekey)
+            if send_size and send_size + item_size >= self.send_limit:
+                for client in self.clients_local_first:
+                    client.set_multi(to_send)
                 to_send.clear()
                 send_size = 0
+            to_send[cachekey] = '%s%s' % (tid, state)
+            send_size += item_size
 
         if to_send:
-            client.set_multi(to_send)
+            for client in self.clients_local_first:
+                client.set_multi(to_send)
 
         self.queue_contents.clear()
         self.queue.seek(0)
 
+    def after_tpc_finish(self, tid):
+        """Update the commit count in the cache.
 
-    def tpc_finish(self):
-        """Update the commit count in the cache."""
-        client = self.client
+        This is called after the database commit lock is released,
+        but before releasing the storage lock that will allow other
+        threads to use this instance.
+        """
+        tid_int = u64(tid)
+
+        # Why do we cache a commit count instead of the transaction ID?
+        # Here's why. This method gets called after the commit lock is
+        # released; other threads or processes could have committed
+        # more transactions in the time that has passed since releasing
+        # the lock, so a cached transaction ID would cause a race. It
+        # also wouldn't work to cache the transaction ID before
+        # releasing the commit lock, since that could cause some
+        # threads or processes watching the cache for changes to think
+        # they are up to date when they are not. The commit count
+        # solves these problems by ensuring that every commit is
+        # followed by a change to the cache that does not conflict with
+        # concurrent committers.
         cachekey = self.commit_count_key
-        if client.incr(cachekey) is None:
-            # Use the current time as an initial commit_count value.
-            client.add(cachekey, int(time.time()))
-            # A concurrent committer could have won the race to set the
-            # initial commit_count.  Increment commit_count so that it
-            # doesn't matter who won.
-            client.incr(cachekey)
+        for client in self.clients_global_first:
+            if client.incr(cachekey) is None:
+                # Initialize commit_count.
+                # Use a random number for the base.
+                client.add(cachekey, random.randint(1, 1<<31))
+                # A concurrent committer could have won the race to set the
+                # initial commit_count.  Increment commit_count so that it
+                # doesn't matter who won.
+                if client.incr(cachekey) is not None:
+                    break
+                # else the client is dead.  Fall back to the next client.
 
+        if self.checkpoints:
+            for oid_int in self.queue_contents:
+                # Future cache lookups for oid_int should now use
+                # the tid just committed.
+                self.delta_after0[oid_int] = tid_int
+
+        self.send_queue(tid)
+
     def clear_temp(self):
-        """Clear any transactional data.  Called after txn finish or abort."""
+        """Discard all transaction-specific temporary data.
+
+        Called after transaction finish or abort.
+        """
         self.queue_contents = None
         if self.queue is not None:
             self.queue.close()
@@ -176,8 +313,296 @@
 
     def need_poll(self):
         """Return True if the commit count has changed"""
-        new_commit_count = self.client.get(self.commit_count_key)
-        if new_commit_count != self.polled_commit_count:
-            self.polled_commit_count = new_commit_count
+        for client in self.clients_global_first:
+            new_commit_count = client.get(self.commit_count_key)
+            if new_commit_count is not None:
+                break
+        if new_commit_count != self.commit_count:
+            self.commit_count = new_commit_count
             return True
         return False
+
+    def after_poll(self, cursor, prev_tid_int, new_tid_int, changes):
+        """Update checkpoint data after a database poll.
+
+        cursor is connected to a load connection.
+
+        changes lists all [(oid_int, tid_int)] changed after
+        prev_tid_int, up to and including new_tid_int, excluding the
+        changes last committed by the associated storage instance.
+        changes can be None to indicate too many objects changed
+        to list them all.
+
+        prev_tid_int can be None, in which case the changes
+        parameter will be ignored.  new_tid_int can not be None.
+        """
+        new_checkpoints = None
+        for client in self.clients_global_first:
+            s = client.get(self.checkpoints_key)
+            if s:
+                try:
+                    c0, c1 = s.split()
+                    c0 = int(c0)
+                    c1 = int(c1)
+                except ValueError:
+                    # Invalid checkpoint cache value; ignore it.
+                    pass
+                else:
+                    if c0 >= c1:
+                        new_checkpoints = (c0, c1)
+                        break
+
+        if not new_checkpoints:
+            new_checkpoints = (new_tid_int, new_tid_int)
+
+            if not self.checkpoints:
+                # Initialize the checkpoints.
+                for client in self.clients_global_first:
+                    client.set(
+                        self.checkpoints_key, '%d %d' % new_checkpoints)
+            else:
+                # Suggest reinstatement of the former checkpoints, but
+                # use new checkpoints for this instance. Using new
+                # checkpoints ensures that we don't build up
+                # self.delta_after0 in case the cache is offline.
+                for client in self.clients_global_first:
+                    client.set(
+                        self.checkpoints_key, '%d %d' % self.checkpoints)
+
+            self.checkpoints = new_checkpoints
+            self.delta_after0 = {}
+            self.delta_after1 = {}
+            self.current_tid = new_tid_int
+            return
+
+        cp0, cp1 = new_checkpoints
+        allow_shift = True
+        if cp0 > new_tid_int:
+            # checkpoint0 is in a future that this instance can't
+            # yet see.  Ignore the checkpoint change for now.
+            new_checkpoints = self.checkpoints
+            allow_shift = False
+
+        if (new_checkpoints == self.checkpoints
+                and changes is not None
+                and prev_tid_int
+                and prev_tid_int <= self.current_tid
+                and new_tid_int >= self.current_tid
+                ):
+            # Keep the checkpoints and update self.delta_after0.
+            m = self.delta_after0
+            m_get = m.get
+            for oid_int, tid_int in changes:
+                my_tid_int = m_get(oid_int)
+                if my_tid_int is None or tid_int > my_tid_int:
+                    m[oid_int] = tid_int
+            self.current_tid = new_tid_int
+        else:
+            # Use the checkpoints specified by the cache.
+            # Rebuild delta_after0 and delta_after1.
+            new_delta_after0 = {}
+            new_delta_after1 = {}
+            # poller.list_changes provides an iterator of (oid, tid) where
+            # tid > after_tid and tid <= last_tid.
+            changes = self.adapter.poller.list_changes(
+                cursor, cp1, new_tid_int)
+            for oid_int, tid_int in changes:
+                if tid_int > cp0:
+                    new_delta_after0[oid_int] = tid_int
+                elif tid_int > cp1:
+                    new_delta_after1[oid_int] = tid_int
+            self.checkpoints = new_checkpoints
+            self.delta_after0 = new_delta_after0
+            self.delta_after1 = new_delta_after1
+            self.current_tid = new_tid_int
+
+        if allow_shift and len(self.delta_after0) >= self.delta_size_limit:
+            # delta_after0 has reached its limit.  The way to
+            # shrink it is to shift the checkpoints.  Suggest
+            # shifted checkpoints for future polls.
+            self._suggest_shifted_checkpoints(new_tid_int)
+
+
+    def _suggest_shifted_checkpoints(self, tid_int):
+        """Suggest that future polls use a new pair of checkpoints.
+
+        This does nothing if another instance has already shifted
+        the checkpoints.
+
+        checkpoint0 shifts to checkpoint1 and the tid just committed
+        becomes checkpoint0.
+        """
+        cp0, cp1 = self.checkpoints
+        assert tid_int > cp0
+        expect = '%d %d' % self.checkpoints
+        change_to = '%d %d' % (tid_int, cp0)
+        for client in self.clients_global_first:
+            old_value = client.get(self.checkpoints_key)
+            if old_value:
+                break
+        if not old_value or old_value == expect:
+            # Shift the checkpoints.
+            # Although this is a race with other instances, the race
+            # should not matter.
+            for client in self.clients_global_first:
+                client.set(self.checkpoints_key, change_to)
+            # The poll code will later see the new checkpoints
+            # and update self.checkpoints and self.delta_after(0|1).
+
+
+class SizeOverflow(Exception):
+    """Too much memory would be consumed by a new key"""
+
+class LocalClientBucket(dict):
+    """A map that keeps a record of its approx. size.
+
+    keys must be strings and most values are strings.
+    """
+
+    def __init__(self, limit):
+        self.size = 0
+        self.limit = limit
+        self._super = super(LocalClientBucket, self)
+
+    def __setitem__(self, key, value):
+        """Set an item.
+
+        Throws SizeOverflow if the new item would cause this map to
+        surpass its memory limit.
+        """
+        if isinstance(value, basestring):
+            sizedelta = len(value)
+        else:
+            sizedelta = 0
+        if key in self:
+            oldvalue = self[key]
+            if isinstance(oldvalue, basestring):
+                sizedelta -= len(oldvalue)
+        else:
+            sizedelta += len(key)
+        if self.size + sizedelta > self.limit:
+            raise SizeOverflow()
+        self._super.__setitem__(key, value)
+        self.size += sizedelta
+        return True
+
+    def __delitem__(self, key):
+        oldvalue = self[key]
+        self._super.__delitem__(key)
+        sizedelta = len(key)
+        if isinstance(oldvalue, basestring):
+            sizedelta += len(oldvalue)
+        self.size -= sizedelta
+
+
+class LocalClient(object):
+    """A memcache-like object that stores in dictionaries"""
+
+    def __init__(self, options):
+        self._lock = threading.Lock()
+        self._lock_acquire = self._lock.acquire
+        self._lock_release = self._lock.release
+        self._bucket_limit = 1000000 * options.cache_local_mb / 2
+        self._value_limit = self._bucket_limit / 10
+        self._bucket0 = LocalClientBucket(self._bucket_limit)
+        self._bucket1 = LocalClientBucket(self._bucket_limit)
+
+    def flush_all(self):
+        self._lock_acquire()
+        try:
+            self._bucket0 = LocalClientBucket(self._bucket_limit)
+            self._bucket1 = LocalClientBucket(self._bucket_limit)
+        finally:
+            self._lock_release()
+
+    def get(self, key):
+        self._lock_acquire()
+        try:
+            value = self._bucket0.get(key)
+            if value is None:
+                value = self._bucket1.get(key)
+                if value is None:
+                    return None
+                # This key is active, so move it to bucket0.
+                del self._bucket1[key]
+                self._set_one(key, value)
+            return value
+        finally:
+            self._lock_release()
+
+    def get_multi(self, keys):
+        res = {}
+        self._lock_acquire()
+        try:
+            for key in keys:
+                value = self._bucket0.get(key)
+                if value is None:
+                    value = self._bucket1.get(key)
+                    if value is None:
+                        continue
+                    # This key is active, so move it to bucket0.
+                    del self._bucket1[key]
+                    self._set_one(key, value)
+                res[key] = value
+        finally:
+            self._lock_release()
+        return res
+
+    def _set_one(self, key, value):
+        try:
+            self._bucket0[key] = value
+        except SizeOverflow:
+            # shift bucket0 to bucket1
+            self._bucket1 = self._bucket0
+            self._bucket0 = LocalClientBucket(self._bucket_limit)
+            self._bucket0[key] = value
+
+    def set(self, key, value):
+        self.set_multi({key: value})
+
+    def set_multi(self, d, allow_replace=True):
+        res = {}
+        self._lock_acquire()
+        try:
+            for key, value in d.iteritems():
+                if isinstance(value, basestring):
+                    if len(value) >= self._value_limit:
+                        # This value is too big, so don't cache it.
+                        continue
+
+                if key in self._bucket0:
+                    if not allow_replace:
+                        continue
+                    del self._bucket0[key]
+
+                if key in self._bucket1:
+                    if not allow_replace:
+                        continue
+                    del self._bucket1[key]
+
+                self._set_one(key, value)
+                res[key] = value
+
+        finally:
+            self._lock_release()
+        return res
+
+    def add(self, key, value):
+        self.set_multi({key: value}, allow_replace=False)
+
+    def incr(self, key):
+        self._lock_acquire()
+        try:
+            value = self._bucket0.get(key)
+            if value is None:
+                value = self._bucket1.get(key)
+                if value is None:
+                    return None
+                # this key is active, so move it to bucket0
+                del self._bucket1[key]
+            res = int(value) + 1
+            self._set_one(key, res)
+            return res
+        finally:
+            self._lock_release()
+

Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py	2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/options.py	2009-10-17 23:19:15 UTC (rev 105119)
@@ -39,6 +39,8 @@
         self.cache_servers = ()  # ['127.0.0.1:11211']
         self.cache_module_name = 'memcache'
         self.cache_prefix = ''
+        self.cache_local_mb = 4
+        self.cache_delta_size_limit = 10000
 
         for key, value in kwoptions.iteritems():
             if key in self.__dict__:

Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py	2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/storage.py	2009-10-17 23:19:15 UTC (rev 105119)
@@ -139,7 +139,7 @@
 
 
     def __init__(self, adapter, name=None, create=True,
-            options=None, **kwoptions):
+            options=None, cache=None, **kwoptions):
         self._adapter = adapter
 
         if options is None:
@@ -177,8 +177,10 @@
         # but not yet used.
         self._preallocated_oids = []
 
-        if options.cache_servers:
-            self._cache = StorageCache(options)
+        if cache is not None:
+            self._cache = cache
+        else:
+            self._cache = StorageCache(adapter, options)
 
         if options.blob_dir:
             from ZODB.blob import FilesystemHelper
@@ -270,16 +272,12 @@
         """
         self._adapter.schema.zap_all()
         self._rollback_load_connection()
-        cache = self._cache
-        if cache is not None:
-            cache.flush_all()
+        self._cache.flush_all()
 
     def clear_cache(self):
-        """Clear all data from memcached.  Used by speed tests.
+        """Clear all data from storage caches.  Used by speed tests.
         """
-        cache = self._cache
-        if cache is not None:
-            cache.flush_all()
+        self._cache.flush_all()
 
     def release(self):
         """Release back end database sessions used by this storage instance.
@@ -311,8 +309,9 @@
         See ZODB.interfaces.IMVCCStorage.
         """
         adapter = self._adapter.new_instance()
+        cache = self._cache.new_instance()
         other = RelStorage(adapter=adapter, name=self.__name__,
-            create=False, options=self._options)
+            create=False, options=self._options, cache=cache)
         self._instances.append(weakref.ref(other))
         return other
 
@@ -421,14 +420,7 @@
             if not self._load_transaction_open:
                 self._restart_load()
             cursor = self._load_cursor
-
-            if cache is None:
-                state, tid_int = self._adapter.mover.load_current(
-                    cursor, oid_int)
-            else:
-                state, tid_int = cache.load(
-                    cursor, oid_int, self._prev_polled_tid, self._adapter)
-
+            state, tid_int = cache.load(cursor, oid_int)
         finally:
             self._lock_release()
 
@@ -531,6 +523,7 @@
         assert self._prepared_txn is None
 
         adapter = self._adapter
+        cache = self._cache
         cursor = self._store_cursor
         assert cursor is not None
         oid_int = u64(oid)
@@ -545,9 +538,7 @@
             # save the data in a temporary table
             adapter.mover.store_temp(
                 cursor, self._batcher, oid_int, prev_tid_int, data)
-            cache = self._cache
-            if cache is not None:
-                cache.store_temp(oid_int, data)
+            cache.store_temp(oid_int, data)
             return None
         finally:
             self._lock_release()
@@ -608,11 +599,9 @@
 
             self._restart_store()
             adapter = self._adapter
+            self._cache.tpc_begin()
             self._batcher = self._adapter.mover.make_batcher(
                 self._store_cursor)
-            cache = self._cache
-            if cache is not None:
-                cache.tpc_begin()
 
             if tid is not None:
                 # hold the commit lock and add the transaction now
@@ -676,9 +665,7 @@
         self._max_stored_oid = 0
         self._batcher = None
         self._txn_blobs = None
-        cache = self._cache
-        if cache is not None:
-            cache.clear_temp()
+        self._cache.clear_temp()
 
 
     def _finish_store(self):
@@ -791,10 +778,6 @@
                     ZODB.blob.rename_or_copy_blob(sourcename, targetname)
                     self._txn_blobs[oid] = targetname
 
-        cache = self._cache
-        if cache is not None:
-            cache.tpc_vote(self._tid)
-
         return serials
 
 
@@ -828,9 +811,10 @@
         self._adapter.txncontrol.commit_phase2(
             self._store_conn, self._store_cursor, txn)
         self._adapter.locker.release_commit_lock(self._store_cursor)
-        cache = self._cache
-        if cache is not None:
-            cache.tpc_finish()
+        self._cache.after_tpc_finish(self._tid)
+
+        # N.B. only set _ltid after the commit succeeds,
+        # including cache updates.
         self._ltid = self._tid
 
         #if self._txn_blobs and not self._adapter.keep_history:
@@ -1180,12 +1164,10 @@
         """Return true if polling is needed"""
         now = time.time()
 
-        cache = self._cache
-        if cache is not None:
-            if cache.need_poll():
-                # There is new data ready to poll
-                self._poll_at = now
-                return True
+        if self._cache.need_poll():
+            # There is new data ready to poll
+            self._poll_at = now
+            return True
 
         if not self._load_transaction_open:
             # Since the load connection is closed or does not have
@@ -1227,10 +1209,10 @@
 
             # get a list of changed OIDs and the most recent tid
             poll = self._adapter.poller.poll_invalidations
+            prev = self._prev_polled_tid
             try:
-                oid_ints, new_polled_tid = poll(
-                    self._load_conn, self._load_cursor,
-                    self._prev_polled_tid, ignore_tid)
+                changes, new_polled_tid = poll(
+                    self._load_conn, self._load_cursor, prev, ignore_tid)
             except self._adapter.connmanager.disconnected_exceptions, e:
                 log.warning("Reconnecting load_conn: %s", e)
                 self._drop_load_connection()
@@ -1240,17 +1222,19 @@
                     log.exception("Reconnect failed.")
                     raise
                 log.info("Reconnected.")
-                oid_ints, new_polled_tid = poll(
-                    self._load_conn, self._load_cursor,
-                    self._prev_polled_tid, ignore_tid)
+                changes, new_polled_tid = poll(
+                    self._load_conn, self._load_cursor, prev, ignore_tid)
 
+            self._cache.after_poll(
+                self._load_cursor, prev, new_polled_tid, changes)
+
             self._prev_polled_tid = new_polled_tid
 
-            if oid_ints is None:
+            if changes is None:
                 oids = None
             else:
                 oids = {}
-                for oid_int in oid_ints:
+                for oid_int, tid_int in changes:
                     oids[p64(oid_int)] = 1
             return oids
         finally:

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2009-10-17 23:19:15 UTC (rev 105119)
@@ -232,31 +232,33 @@
 
         db = DB(self._storage)
         try:
+            fakecache.data.clear()
             c1 = db.open()
-            self.assert_(c1._storage._cache.client.servers, ['x:1', 'y:2'])
-            fakecache.data.clear()
+            self.assert_(c1._storage._cache.clients_global_first[0].servers,
+                ['x:1', 'y:2'])
             r1 = c1.root()
             # the root state should now be cached
-            self.assertEqual(fakecache.data.keys(), ['zzz:state:0'])
+            self.assertEqual(len(fakecache.data), 2)
+            self.assertTrue('zzz:checkpoints' in fakecache.data)
+            self.assertEqual(sorted(fakecache.data.keys())[1][:10],
+                'zzz:state:')
             r1['alpha'] = PersistentMapping()
-            self.assertFalse('zzz:commit_count' in fakecache.data)
+            self.assertFalse('zzz:commits' in fakecache.data)
             transaction.commit()
-            self.assertTrue('zzz:commit_count' in fakecache.data)
-            self.assertEqual(sorted(fakecache.data.keys()),
-                ['zzz:commit_count', 'zzz:state:0', 'zzz:state:1'])
+            self.assertTrue('zzz:commits' in fakecache.data)
+            self.assertEqual(len(fakecache.data.keys()), 5)
 
             oid = r1['alpha']._p_oid
             got, serial = c1._storage.load(oid, '')
             # another state should now be cached
-            self.assertEqual(len(fakecache.data.keys()), 3)
+            self.assertEqual(len(fakecache.data.keys()), 5)
 
             # make a change
             r1['beta'] = 0
             transaction.commit()
+            self.assertEqual(len(fakecache.data.keys()), 6)
 
             got, serial = c1._storage.load(oid, '')
-            # a backpointer should now be cached
-            self.assertEqual(len(fakecache.data.keys()), 4)
 
             # try to load an object that doesn't exist
             self.assertRaises(KeyError, c1._storage.load, 'bad.oid.', '')
@@ -311,7 +313,7 @@
         finally:
             db.close()
 
-    def checkPollInterval(self, using_cache=False):
+    def checkPollInterval(self, using_cache=True):
         # Verify the poll_interval parameter causes RelStorage to
         # delay invalidation polling.
         self._storage._options.poll_interval = 3600
@@ -347,6 +349,7 @@
                 self.assertEqual(r2['alpha'], 2)
                 self.assertFalse(c2._storage.need_poll())
             else:
+                # Now confirm that no poll is needed
                 self.assertFalse(c2._storage.need_poll())
                 c2._flush_invalidations()
                 r2 = c2.root()
@@ -365,10 +368,8 @@
         finally:
             db.close()
 
-    def checkPollIntervalWithCache(self):
-        self._storage._options.cache_servers = 'x:1'
-        self._storage._options.cache_module_name = fakecache.__name__
-        fakecache.data.clear()
+    def checkPollIntervalWithoutCache(self):
+        self._storage._options.cache_local_mb = 0
         self.checkPollInterval(using_cache=True)
 
     def checkDoubleCommitter(self):

Modified: relstorage/trunk/relstorage/tests/test_cache.py
===================================================================
--- relstorage/trunk/relstorage/tests/test_cache.py	2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/tests/test_cache.py	2009-10-17 23:19:15 UTC (rev 105119)
@@ -26,103 +26,125 @@
         from relstorage.cache import StorageCache
         return StorageCache
 
+    def _makeOne(self):
+        return self.getClass()(MockAdapter(), MockOptionsWithFakeCache())
+
     def test_ctor(self):
         from relstorage.tests.fakecache import Client
-        c = self.getClass()(MockOptions())
-        self.assert_(isinstance(c.client, Client))
-        self.assertEqual(c.client.servers, ['host:9999'])
+        c = self._makeOne()
+        self.assertEqual(len(c.clients_local_first), 2)
+        self.assertEqual(len(c.clients_global_first), 2)
+        self.assert_(isinstance(c.clients_global_first[0], Client))
+        self.assertEqual(c.clients_global_first[0].servers, ['host:9999'])
         self.assertEqual(c.prefix, 'myprefix')
 
     def test_flush_all(self):
         from relstorage.tests.fakecache import data
         data.clear()
-        c = self.getClass()(MockOptions())
+        c = self._makeOne()
         data['x'] = '1'
         c.flush_all()
         self.assert_(not data)
+        self.assertEqual(c.checkpoints, None)
+        self.assertEqual(c.delta_after0, {})
+        self.assertEqual(c.delta_after1, {})
 
-    def test_load_from_current_transaction(self):
+    def test_load_without_checkpoints(self):
+        c = self._makeOne()
+        res = c.load(None, 2)
+        self.assertEqual(res, (None, None))
+
+    def test_load_using_delta_after0_hit(self):
         from relstorage.tests.fakecache import data
         from ZODB.utils import p64
-        c = self.getClass()(MockOptions())
-        tid_int = 50
-        tid = p64(tid_int)
-        data['myprefix:state:2'] = tid + 'STATE'
-        state, got_tid_int = c.load(None, 2, tid_int, None)
-        self.assertEqual(state, 'STATE')
-        self.assertEqual(got_tid_int, tid_int)
+        adapter = MockAdapter()
+        c = self.getClass()(adapter, MockOptionsWithFakeCache())
+        c.checkpoints = (50, 40)
+        c.delta_after0[2] = 55
+        data['myprefix:state:55:2'] = p64(55) + 'abc'
+        res = c.load(None, 2)
+        self.assertEqual(res, ('abc', 55))
 
-    def test_load_from_backptr(self):
+    def test_load_using_delta_after0_miss(self):
         from relstorage.tests.fakecache import data
         from ZODB.utils import p64
-        c = self.getClass()(MockOptions())
-        tid_int = 50
-        tid = p64(tid_int)
-        data['myprefix:state:2'] = tid + 'STATE'
-        data['myprefix:back:60:2'] = tid
-        state, got_tid_int = c.load(None, 2, 60, None)
-        self.assertEqual(state, 'STATE')
-        self.assertEqual(got_tid_int, tid_int)
+        adapter = MockAdapter()
+        c = self.getClass()(adapter, MockOptionsWithFakeCache())
+        c.checkpoints = (50, 40)
+        c.delta_after0[2] = 55
+        adapter.mover.data[2] = ('abc', 55)
+        res = c.load(None, 2)
+        self.assertEqual(res, ('abc', 55))
 
-    def test_load_backptr_missing(self):
+    def test_load_using_checkpoint0_hit(self):
         from relstorage.tests.fakecache import data
         from ZODB.utils import p64
-        c = self.getClass()(MockOptions())
-        tid_int = 50
-        tid = p64(tid_int)
-        data['myprefix:state:2'] = tid + 'STATE'
         adapter = MockAdapter()
-        adapter.mover.data[2] = ('STATE', 50)
-        state, got_tid_int = c.load(None, 2, 60, adapter)
-        self.assertEqual(state, 'STATE')
-        self.assertEqual(got_tid_int, 50)
-        self.assertEqual(data, {
-            'myprefix:state:2': tid + 'STATE',
-            'myprefix:back:60:2': tid,
-            })
+        c = self.getClass()(adapter, MockOptionsWithFakeCache())
+        c.checkpoints = (50, 40)
+        data['myprefix:state:50:2'] = p64(45) + 'xyz'
+        res = c.load(None, 2)
+        self.assertEqual(res, ('xyz', 45))
 
-    def test_load_state_expired(self):
+    def test_load_using_checkpoint0_miss(self):
         from relstorage.tests.fakecache import data
         from ZODB.utils import p64
-        c = self.getClass()(MockOptions())
-        tid_int = 50
-        tid = p64(tid_int)
-        data['myprefix:state:2'] = tid + 'STATE'
         adapter = MockAdapter()
-        adapter.mover.data[2] = ('NEWSTATE', 55)
-        state, got_tid_int = c.load(None, 2, 60, adapter)
-        self.assertEqual(state, 'NEWSTATE')
-        self.assertEqual(got_tid_int, 55)
-        self.assertEqual(data, {
-            'myprefix:state:2': p64(55) + 'NEWSTATE',
-            'myprefix:back:60:2': p64(55),
-            })
+        c = self.getClass()(adapter, MockOptionsWithFakeCache())
+        c.checkpoints = (50, 40)
+        adapter.mover.data[2] = ('xyz', 45)
+        res = c.load(None, 2)
+        self.assertEqual(res, ('xyz', 45))
+        self.assertEqual(data.get('myprefix:state:50:2'), p64(45) + 'xyz')
 
-    def test_load_state_missing(self):
+    def test_load_using_delta_after1_hit(self):
         from relstorage.tests.fakecache import data
         from ZODB.utils import p64
-        c = self.getClass()(MockOptions())
-        tid_int = 50
-        tid = p64(tid_int)
         adapter = MockAdapter()
-        adapter.mover.data[2] = ('NEWSTATE', 55)
-        state, got_tid_int = c.load(None, 2, 60, adapter)
-        self.assertEqual(state, 'NEWSTATE')
-        self.assertEqual(got_tid_int, 55)
-        self.assertEqual(data, {
-            'myprefix:state:2': p64(55) + 'NEWSTATE',
-            'myprefix:back:60:2': p64(55),
-            })
+        c = self.getClass()(adapter, MockOptionsWithFakeCache())
+        c.checkpoints = (50, 40)
+        c.delta_after1[2] = 45
+        data['myprefix:state:45:2'] = p64(45) + 'abc'
+        res = c.load(None, 2)
+        self.assertEqual(res, ('abc', 45))
+        self.assertEqual(data.get('myprefix:state:50:2'), p64(45) + 'abc')
 
-    def test_load_no_object(self):
-        c = self.getClass()(MockOptions())
+    def test_load_using_delta_after1_miss(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
         adapter = MockAdapter()
-        state, got_tid_int = c.load(None, 2, 60, adapter)
-        self.assertEqual(state, '')
-        self.assertEqual(got_tid_int, None)
+        c = self.getClass()(adapter, MockOptionsWithFakeCache())
+        c.checkpoints = (50, 40)
+        c.delta_after1[2] = 45
+        adapter.mover.data[2] = ('abc', 45)
+        res = c.load(None, 2)
+        self.assertEqual(res, ('abc', 45))
+        self.assertEqual(data.get('myprefix:state:50:2'), p64(45) + 'abc')
 
+    def test_load_using_checkpoint1_hit(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
+        adapter = MockAdapter()
+        c = self.getClass()(adapter, MockOptionsWithFakeCache())
+        c.checkpoints = (50, 40)
+        data['myprefix:state:40:2'] = p64(35) + '123'
+        res = c.load(None, 2)
+        self.assertEqual(res, ('123', 35))
+        self.assertEqual(data.get('myprefix:state:50:2'), p64(35) + '123')
+
+    def test_load_using_checkpoint1_miss(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
+        adapter = MockAdapter()
+        c = self.getClass()(adapter, MockOptionsWithFakeCache())
+        c.checkpoints = (50, 40)
+        adapter.mover.data[2] = ('123', 35)
+        res = c.load(None, 2)
+        self.assertEqual(res, ('123', 35))
+        self.assertEqual(data.get('myprefix:state:50:2'), p64(35) + '123')
+
     def test_store_temp(self):
-        c = self.getClass()(MockOptions())
+        c = self._makeOne()
         c.tpc_begin()
         c.store_temp(2, 'abc')
         c.store_temp(1, 'def')
@@ -131,80 +153,335 @@
         c.queue.seek(0)
         self.assertEqual(c.queue.read(), 'abcdefghi')
 
-    def test_tpc_vote_small(self):
+    def test_send_queue_small(self):
         from relstorage.tests.fakecache import data
         from ZODB.utils import p64
-        c = self.getClass()(MockOptions())
+        c = self._makeOne()
         c.tpc_begin()
         c.store_temp(2, 'abc')
         c.store_temp(3, 'def')
         tid = p64(55)
-        c.tpc_vote(tid)
+        c.send_queue(tid)
         self.assertEqual(data, {
-            'myprefix:state:2': tid + 'abc',
-            'myprefix:state:3': tid + 'def',
+            'myprefix:state:55:2': tid + 'abc',
+            'myprefix:state:55:3': tid + 'def',
             })
 
-    def test_tpc_vote_large(self):
+    def test_send_queue_large(self):
         from relstorage.tests.fakecache import data
         from ZODB.utils import p64
-        c = self.getClass()(MockOptions())
+        c = self._makeOne()
         c.send_limit = 100
         c.tpc_begin()
         c.store_temp(2, 'abc')
         c.store_temp(3, 'def' * 100)
         tid = p64(55)
-        c.tpc_vote(tid)
+        c.send_queue(tid)
         self.assertEqual(data, {
-            'myprefix:state:2': tid + 'abc',
-            'myprefix:state:3': tid + ('def' * 100),
+            'myprefix:state:55:2': tid + 'abc',
+            'myprefix:state:55:3': tid + ('def' * 100),
             })
 
-    def test_tpc_vote_none(self):
+    def test_send_queue_none(self):
         from relstorage.tests.fakecache import data
         from ZODB.utils import p64
-        c = self.getClass()(MockOptions())
+        c = self._makeOne()
         c.tpc_begin()
         tid = p64(55)
-        c.tpc_vote(tid)
+        c.send_queue(tid)
         self.assertEqual(data, {})
 
-    def test_tpc_finish(self):
+    def test_after_tpc_finish(self):
         from relstorage.tests.fakecache import data
-        c = self.getClass()(MockOptions())
-        c.tpc_finish()
-        count = data['myprefix:commit_count']
+        from ZODB.utils import p64
+        c = self._makeOne()
+        c.tpc_begin()
+        c.after_tpc_finish(p64(55))
+        count = data['myprefix:commits']
         self.assert_(count > 0)
-        c.tpc_finish()
-        newcount = data['myprefix:commit_count']
+        c.after_tpc_finish(p64(55))
+        newcount = data['myprefix:commits']
         self.assert_(newcount == count + 1)
 
     def test_clear_temp(self):
-        c = self.getClass()(MockOptions())
+        c = self._makeOne()
         c.tpc_begin()
         c.clear_temp()
         self.assertEqual(c.queue_contents, None)
         self.assertEqual(c.queue, None)
 
     def test_need_poll(self):
-        c = self.getClass()(MockOptions())
+        from ZODB.utils import p64
+        c = self._makeOne()
         self.assertTrue(c.need_poll())
         self.assertFalse(c.need_poll())
         self.assertFalse(c.need_poll())
-        c.tpc_finish()
+        c.tpc_begin()
+        c.after_tpc_finish(p64(55))
         self.assertTrue(c.need_poll())
         self.assertFalse(c.need_poll())
         self.assertFalse(c.need_poll())
 
+    def test_after_poll_init_checkpoints(self):
+        from relstorage.tests.fakecache import data
+        c = self._makeOne()
+        c.after_poll(None, 40, 50, [])
+        self.assertEqual(c.checkpoints, (50, 50))
+        self.assertEqual(data['myprefix:checkpoints'], '50 50')
 
+    def test_after_poll_ignore_garbage_checkpoints(self):
+        from relstorage.tests.fakecache import data
+        data['myprefix:checkpoints'] = 'baddata'
+        c = self._makeOne()
+        c.after_poll(None, 40, 50, [])
+        self.assertEqual(c.checkpoints, (50, 50))
+        self.assertEqual(data['myprefix:checkpoints'], '50 50')
+
+    def test_after_poll_ignore_invalid_checkpoints(self):
+        from relstorage.tests.fakecache import data
+        data['myprefix:checkpoints'] = '60 70'  # bad: c0 < c1
+        c = self._makeOne()
+        c.after_poll(None, 40, 50, [])
+        self.assertEqual(c.checkpoints, (50, 50))
+        self.assertEqual(data['myprefix:checkpoints'], '50 50')
+
+    def test_after_poll_reinstate_checkpoints(self):
+        from relstorage.tests.fakecache import data
+        c = self._makeOne()
+        c.checkpoints = (40, 30)
+        c.after_poll(None, 40, 50, [])
+        self.assertEqual(c.checkpoints, (50, 50))
+        self.assertEqual(data['myprefix:checkpoints'], '40 30')
+
+    def test_after_poll_future_checkpoints(self):
+        from relstorage.tests.fakecache import data
+        data['myprefix:checkpoints'] = '90 80'
+        c = self._makeOne()
+        c.checkpoints = (40, 30)
+        c.current_tid = 40
+        c.after_poll(None, 40, 50, [(2, 45)])
+        # This instance can't yet see txn 90, so it sticks with
+        # the existing checkpoints.
+        self.assertEqual(c.checkpoints, (40, 30))
+        self.assertEqual(data['myprefix:checkpoints'], '90 80')
+        self.assertEqual(c.delta_after0, {2: 45})
+        self.assertEqual(c.delta_after1, {})
+
+    def test_after_poll_retain_checkpoints(self):
+        from relstorage.tests.fakecache import data
+        data['myprefix:checkpoints'] = '40 30'
+        c = self._makeOne()
+        c.checkpoints = (40, 30)
+        c.current_tid = 40
+        c.delta_after1 = {1: 35}
+        c.after_poll(None, 40, 50, [(2, 45)])
+        self.assertEqual(c.checkpoints, (40, 30))
+        self.assertEqual(data['myprefix:checkpoints'], '40 30')
+        self.assertEqual(c.delta_after0, {2: 45})
+        self.assertEqual(c.delta_after1, {1: 35})
+
+    def test_after_poll_new_checkpoints(self):
+        from relstorage.tests.fakecache import data
+        data['myprefix:checkpoints'] = '50 40'
+        adapter = MockAdapter()
+        c = self.getClass()(adapter, MockOptionsWithFakeCache())
+        adapter.poller.changes = [(3, 42), (1, 35), (2, 45)]
+        c.checkpoints = (40, 30)
+        c.current_tid = 40
+        c.after_poll(None, 40, 50, [(2, 45)])
+        self.assertEqual(c.checkpoints, (50, 40))
+        self.assertEqual(data['myprefix:checkpoints'], '50 40')
+        self.assertEqual(c.delta_after0, {})
+        self.assertEqual(c.delta_after1, {2: 45, 3: 42})
+
+    def test_after_poll_gap(self):
+        from relstorage.tests.fakecache import data
+        data['myprefix:checkpoints'] = '40 30'
+        adapter = MockAdapter()
+        c = self.getClass()(adapter, MockOptionsWithFakeCache())
+        adapter.poller.changes = [(3, 42), (1, 35), (2, 45)]
+        c.checkpoints = (40, 30)
+        c.current_tid = 40
+        # provide a prev_tid_int that shows a gap in the polled
+        # transaction list, forcing a rebuild of delta_after(0|1).
+        c.after_poll(None, 43, 50, [(2, 45)])
+        self.assertEqual(c.checkpoints, (40, 30))
+        self.assertEqual(data['myprefix:checkpoints'], '40 30')
+        self.assertEqual(c.delta_after0, {2: 45, 3: 42})
+        self.assertEqual(c.delta_after1, {1: 35})
+
+    def test_after_poll_shift_checkpoints(self):
+        from relstorage.tests.fakecache import data
+        data['myprefix:checkpoints'] = '40 30'
+        c = self._makeOne()
+        c.delta_size_limit = 2
+        c.checkpoints = (40, 30)
+        c.current_tid = 40
+        c.after_poll(None, 40, 314, [(1, 45), (2, 46)])
+        self.assertEqual(c.checkpoints, (40, 30))
+        self.assertEqual(data['myprefix:checkpoints'], '314 40')
+        self.assertEqual(c.delta_after0, {1: 45, 2: 46})
+        self.assertEqual(c.delta_after1, {})
+
+
+class LocalClientBucketTests(unittest.TestCase):
+
+    def getClass(self):
+        from relstorage.cache import LocalClientBucket
+        return LocalClientBucket
+
+    def test_set_string_value(self):
+        b = self.getClass()(100)
+        self.assertEqual(b.size, 0)
+        b['abc'] = 'defghi'
+        self.assertEqual(b.size, 9)
+        b['abc'] = '123'
+        self.assertEqual(b.size, 6)
+        b['abc'] = ''
+        self.assertEqual(b.size, 3)
+        b['abc'] = 'defghi'
+        self.assertEqual(b.size, 9)
+        del b['abc']
+        self.assertEqual(b.size, 0)
+
+    def test_set_integer_value(self):
+        b = self.getClass()(100)
+        self.assertEqual(b.size, 0)
+        b['abc'] = 5
+        self.assertEqual(b.size, 3)
+        b['abc'] = -7
+        self.assertEqual(b.size, 3)
+        b['abc'] = 0
+        self.assertEqual(b.size, 3)
+        del b['abc']
+        self.assertEqual(b.size, 0)
+
+    def test_set_limit(self):
+        from relstorage.cache import SizeOverflow
+        b = self.getClass()(5)
+        self.assertEqual(b.size, 0)
+        b['abc'] = 'xy'
+        self.assertEqual(b.size, 5)
+        b['abc'] = 'z'
+        self.assertEqual(b.size, 4)
+        self.assertRaises(SizeOverflow, b.__setitem__, 'abc', 'xyz')
+        self.assertEqual(b['abc'], 'z')
+
+
+class LocalClientTests(unittest.TestCase):
+
+    def getClass(self):
+        from relstorage.cache import LocalClient
+        return LocalClient
+
+    def _makeOne(self):
+        return self.getClass()(MockOptions())
+
+    def test_ctor(self):
+        c = self._makeOne()
+        self.assertEqual(c._bucket_limit, 500000)
+        self.assertEqual(c._value_limit, 50000)
+
+    def test_set_and_get(self):
+        c = self._makeOne()
+        c.set('abc', 'def')
+        self.assertEqual(c.get('abc'), 'def')
+        self.assertEqual(c.get('xyz'), None)
+
+    def test_set_multi_and_get_multi(self):
+        c = self._makeOne()
+        c.set_multi({'k0': 'abc', 'k1': 'def'})
+        self.assertEqual(c.get_multi(['k0', 'k1']), {'k0': 'abc', 'k1': 'def'})
+        self.assertEqual(c.get_multi(['k0', 'k2']), {'k0': 'abc'})
+        self.assertEqual(c.get_multi(['k2', 'k3']), {})
+
+    def test_lru(self):
+        # LocalClient is a simple LRU cache.  Confirm it keeps the right keys.
+        c = self._makeOne()
+        c._bucket_limit = 51
+        c.flush_all()
+        for i in range(5):
+            # add 10 bytes
+            c.set('k%d' % i, '01234567')
+        self.assertEqual(c._bucket0.size, 50)
+        self.assertEqual(c._bucket1.size, 0)
+        c.set('k5', '01234567')
+        self.assertEqual(c._bucket0.size, 10)
+        self.assertEqual(c._bucket1.size, 50)
+        v = c.get('k2')
+        self.assertEqual(v, '01234567')
+        self.assertEqual(c._bucket0.size, 20)
+        self.assertEqual(c._bucket1.size, 40)
+        for i in range(5):
+            # add 10 bytes
+            c.set('x%d' % i, '01234567')
+        self.assertEqual(c._bucket0.size, 20)
+        self.assertEqual(c._bucket1.size, 50)
+        self.assertEqual(c.get('x0'), '01234567')
+        self.assertEqual(c.get('x1'), '01234567')
+        self.assertEqual(c.get('x2'), '01234567')
+        self.assertEqual(c.get('x3'), '01234567')
+        self.assertEqual(c.get('x4'), '01234567')
+        self.assertEqual(c._bucket0.size, 50)
+        self.assertEqual(c._bucket1.size, 20)
+        self.assertEqual(c.get('k0'), None)
+        self.assertEqual(c.get('k1'), None)
+        self.assertEqual(c.get('k2'), '01234567')
+        self.assertEqual(c.get('k3'), None)
+        self.assertEqual(c.get('k4'), None)
+        self.assertEqual(c.get('k5'), None)
+
+        self.assertEqual(c._bucket0.size, 10)
+        self.assertEqual(c._bucket1.size, 50)
+
+        c.set('z0', '01234567')
+        self.assertEqual(c._bucket0.size, 20)
+        self.assertEqual(c._bucket1.size, 50)
+
+    def test_add(self):
+        c = self._makeOne()
+        c.set('k0', 'abc')
+        c.add('k0', 'def')
+        c.add('k1', 'ghi')
+        self.assertEqual(c.get_multi(['k0', 'k1']), {'k0': 'abc', 'k1': 'ghi'})
+
+    def test_incr_normal(self):
+        c = self._makeOne()
+        c.set('k0', 41)
+        self.assertEqual(c.incr('k0'), 42)
+        self.assertEqual(c.incr('k1'), None)
+
+    def test_incr_hit_size_limit(self):
+        c = self._makeOne()
+        c._bucket_limit = 4
+        c.flush_all()
+        c.set('k0', 14)
+        c.set('key1', 27)
+        self.assertEqual(c._bucket0.size, 4)
+        self.assertEqual(c._bucket1.size, 2)
+        self.assertEqual(c.incr('k0'), 15)  # this moves k0 to bucket0
+        self.assertEqual(c._bucket0.size, 2)
+        self.assertEqual(c._bucket1.size, 4)
+
+
 class MockOptions:
+    cache_module_name = ''
+    cache_servers = ''
+    cache_prefix = ''
+    cache_local_mb = 1
+    cache_delta_size_limit = 10000
+
+class MockOptionsWithFakeCache:
     cache_module_name = 'relstorage.tests.fakecache'
     cache_servers = 'host:9999'
     cache_prefix = 'myprefix'
+    cache_local_mb = 1
+    cache_delta_size_limit = 10000
 
 class MockAdapter:
     def __init__(self):
         self.mover = MockObjectMover()
+        self.poller = MockPoller()
 
 class MockObjectMover:
     def __init__(self):
@@ -212,7 +489,16 @@
     def load_current(self, cursor, oid_int):
         return self.data.get(oid_int, (None, None))
 
+class MockPoller:
+    def __init__(self):
+        self.changes = []  # [(oid, tid)]
+    def list_changes(self, cursor, after_tid, last_tid):
+        return ((oid, tid) for (oid, tid) in self.changes
+                if tid > after_tid and tid <= last_tid)
+
 def test_suite():
     suite = unittest.TestSuite()
     suite.addTest(unittest.makeSuite(StorageCacheTests))
+    suite.addTest(unittest.makeSuite(LocalClientBucketTests))
+    suite.addTest(unittest.makeSuite(LocalClientTests))
     return suite



More information about the checkins mailing list