[Checkins] SVN: relstorage/trunk/ The memcache integration now uses checkpoints, which
Shane Hathaway
shane at hathawaymix.org
Sat Oct 17 19:19:15 EDT 2009
Log message for revision 105119:
The memcache integration now uses checkpoints, which
will theoretically yield a high cache hit rate even when
committing many transactions.
Also added process-wide caching similar to a ZEO cache.
The new cache has the same interface as memcache.
Now we have a multi-level cache architecture:
- L1 (cPickleCache)
- L2 (process-wide cache)
- L3 (memcached)
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/relstorage/adapters/interfaces.py
U relstorage/trunk/relstorage/adapters/poller.py
U relstorage/trunk/relstorage/cache.py
U relstorage/trunk/relstorage/options.py
U relstorage/trunk/relstorage/storage.py
U relstorage/trunk/relstorage/tests/reltestbase.py
U relstorage/trunk/relstorage/tests/test_cache.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/CHANGES.txt 2009-10-17 23:19:15 UTC (rev 105119)
@@ -14,6 +14,9 @@
- Revised the way RelStorage uses memcached. Minimized the number of
trips to both the cache server and the database.
+- Added an in-process pickle cache that serves a function similar to the
+ ZEO cache.
+
- Added a wrapper module for pylibmc.
- Store operations now use multi-insert and multi-delete SQL
Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py 2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/adapters/interfaces.py 2009-10-17 23:19:15 UTC (rev 105119)
@@ -340,10 +340,20 @@
of the exceptions listed in the disconnected_exceptions
attribute of the associated IConnectionManager.
- Returns (changed_oids, new_polled_tid).
+ Returns (changes, new_polled_tid), where changes is either
+ a list of (oid, tid) that have changed, or None to indicate
+ that the changes are too complex to list. new_polled_tid is
+ never None.
"""
+ def list_changes(cursor, after_tid, last_tid):
+ """Return the (oid, tid) values changed in a range of transactions.
+ The returned iterable must include all changes in the range
+ after_tid < tid <= last_tid.
+ """
+
+
class ISchemaInstaller(Interface):
"""Install the schema in the database, clear it, or uninstall it"""
Modified: relstorage/trunk/relstorage/adapters/poller.py
===================================================================
--- relstorage/trunk/relstorage/adapters/poller.py 2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/adapters/poller.py 2009-10-17 23:19:15 UTC (rev 105119)
@@ -35,11 +35,15 @@
committed in that transaction will not be included in the list
of changed OIDs.
- Returns (changed_oids, new_polled_tid).
+ Returns (changes, new_polled_tid), where changes is either
+ a list of (oid, tid) that have changed, or None to indicate
+ that the changes are too complex to list. new_polled_tid is
+ never None.
"""
# find out the tid of the most recent transaction.
cursor.execute(self.poll_query)
new_polled_tid = cursor.fetchone()[0]
+ assert new_polled_tid is not None
if prev_polled_tid is None:
# This is the first time the connection has polled.
@@ -72,13 +76,13 @@
if new_polled_tid > prev_polled_tid:
if self.keep_history:
stmt = """
- SELECT zoid
+ SELECT zoid, tid
FROM current_object
WHERE tid > %(tid)s
"""
else:
stmt = """
- SELECT zoid
+ SELECT zoid, tid
FROM object_state
WHERE tid > %(tid)s
"""
@@ -89,9 +93,9 @@
stmt = intern(stmt % self.runner.script_vars)
cursor.execute(stmt, params)
- oids = [oid for (oid,) in cursor]
+ changes = list(cursor)
- return oids, new_polled_tid
+ return changes, new_polled_tid
else:
# We moved backward in time. This can happen after failover
@@ -110,3 +114,28 @@
# invalidating the whole cache is simpler.
return None, new_polled_tid
+ def list_changes(self, cursor, after_tid, last_tid):
+ """Return the (oid, tid) values changed in a range of transactions.
+
+ The returned iterable must include the latest changes in the range
+ after_tid < tid <= last_tid.
+ """
+ if self.keep_history:
+ stmt = """
+ SELECT zoid, tid
+ FROM current_object
+ WHERE tid > %(min_tid)s
+ AND tid <= %(max_tid)s
+ """
+ else:
+ stmt = """
+ SELECT zoid, tid
+ FROM object_state
+ WHERE tid > %(min_tid)s
+ AND tid <= %(max_tid)s
+ """
+ params = {'min_tid': after_tid, 'max_tid': last_tid}
+ stmt = intern(stmt % self.runner.script_vars)
+
+ cursor.execute(stmt, params)
+ return list(cursor)
Modified: relstorage/trunk/relstorage/cache.py
===================================================================
--- relstorage/trunk/relstorage/cache.py 2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/cache.py 2009-10-17 23:19:15 UTC (rev 105119)
@@ -15,92 +15,193 @@
from relstorage.autotemp import AutoTemporaryFile
from ZODB.utils import p64
from ZODB.utils import u64
-import time
+import random
+import threading
class StorageCache(object):
"""RelStorage integration with memcached or similar.
+
+ Holds a list of memcache clients in order from most local to
+ most global. The first is a LocalClient, which stores the cache
+ in process but shares the cache between threads.
"""
- # send_limit: max approx. bytes to buffer before sending to the cache
+ # send_limit: approximate limit on the bytes to buffer before
+ # sending to the cache.
send_limit = 1024 * 1024
- def __init__(self, options):
- module_name = options.cache_module_name
- module = __import__(module_name, {}, {}, ['Client'])
- servers = options.cache_servers
- if isinstance(servers, basestring):
- servers = servers.split()
- self.client = module.Client(servers)
+ # queue is an AutoTemporaryFile during transaction commit.
+ queue = None
+
+ # queue_contents is a map of {oid_int: (startpos, endpos)}
+ # during transaction commit.
+ queue_contents = None
+
+ # checkpoints, when set, is a tuple containing the integer
+ # transaction ID of the two current checkpoints. checkpoint0 is
+ # greater than or equal to checkpoint1.
+ checkpoints = None
+
+ # current_tid contains the last polled transaction ID. Invariant:
+ # when self.checkpoints is not None, self.delta_after0 has info
+ # from all transactions in the range:
+ # self.checkpoints[0] < tid <= self.current_tid
+ current_tid = 0
+
+ # commit_count contains the last polled value of the
+ # :commits cache key. The most global client currently
+ # responding stores the value.
+ commit_count = object()
+
+ def __init__(self, adapter, options, local_client=None):
+ self.adapter = adapter
+ self.options = options
+ if local_client is None:
+ local_client = LocalClient(options)
+ self.clients_local_first = [local_client]
+
+ if options.cache_servers:
+ module_name = options.cache_module_name
+ module = __import__(module_name, {}, {}, ['Client'])
+ servers = options.cache_servers
+ if isinstance(servers, basestring):
+ servers = servers.split()
+ self.clients_local_first.append(module.Client(servers))
+
+ # self.clients_local_first is in order from local to global caches,
+ # while self.clients_global_first is in order from global to local.
+ self.clients_global_first = list(self.clients_local_first)
+ self.clients_global_first.reverse()
+
+ # every cache key has a prefix
self.prefix = options.cache_prefix or ''
- # queue is an AutoTemporaryFile during txn commit.
- self.queue = None
+ # commit_count_key contains a number that is incremented
+ # for every commit. See tpc_finish().
+ self.commit_count_key = '%s:commits' % self.prefix
- # queue_contents is a map of {oid: (startpos, endpos)}
- # during txn commit.
- self.queue_contents = None
+ # checkpoints_key holds the current checkpoints.
+ self.checkpoints_key = '%s:checkpoints' % self.prefix
- # commit_count_key is the cache key to poll for changes
- self.commit_count_key = '%s:commit_count' % self.prefix
+ # delta_after0 contains {oid: tid} after checkpoint 0
+ # and before or at self.current_tid.
+ self.delta_after0 = {}
- # polled_commit_count contains the last polled value of the
- # 'commit_count' cache key
- self.polled_commit_count = 0
+ # delta_after1 contains {oid: tid} after checkpoint 1 and
+ # before or at checkpoint 0. The content of delta_after1 only
+ # changes when checkpoints move.
+ self.delta_after1 = {}
+ # delta_size_limit places an approximate limit on the number of
+ # entries in the delta_after maps.
+ self.delta_size_limit = options.cache_delta_size_limit
+
+ def new_instance(self):
+ """Return a copy of this instance sharing the same local client"""
+ local_client = self.clients_local_first[0]
+ return StorageCache(self.adapter, self.options, local_client)
+
def flush_all(self):
"""Remove all data from the cache. Called by RelStorage.zap_all()"""
- self.client.flush_all()
+ for client in self.clients_local_first:
+ client.flush_all()
+ self.checkpoints = None
+ self.delta_after0 = {}
+ self.delta_after1 = {}
- def load(self, cursor, oid_int, prev_polled_tid, adapter):
+ def load(self, cursor, oid_int):
"""Load the given object from cache if possible.
Fall back to loading from the database.
"""
- client = self.client
- state_key = '%s:state:%d' % (self.prefix, oid_int)
- if prev_polled_tid:
- backptr_key = '%s:back:%d:%d' % (
- self.prefix, prev_polled_tid, oid_int)
- v = client.get_multi([state_key, backptr_key])
- if v is not None:
- cache_data = v.get(state_key)
- backptr = v.get(backptr_key)
- else:
- cache_data = None
- backptr = None
- else:
- cache_data = client.get(state_key)
- backptr = None
+ if not self.checkpoints:
+ # No poll has occurred yet. For safety, don't use the cache.
+ return self.adapter.mover.load_current(cursor, oid_int)
- state = None
- if cache_data and len(cache_data) >= 8:
- # validate the cache result
- tid = cache_data[:8]
- tid_int = u64(tid)
- if tid_int == prev_polled_tid or tid == backptr:
- # the cached data is current.
- state = cache_data[8:]
+ prefix = self.prefix
- if state is None:
- # could not load from cache, so get from the database
- state, tid_int = adapter.mover.load_current(
+ # Get the object from the transaction specified
+ # by the following values, in order:
+ #
+ # 1. delta_after0[oid_int]
+ # 2. checkpoints[0]
+ # 3. delta_after1[oid_int]
+ # 4. checkpoints[1]
+ # 5. The database.
+ #
+ # checkpoints[0] is the preferred location.
+ #
+ # If delta_after0 contains oid_int, we should not look at any
+ # other cache keys, since the tid_int specified in delta_after0
+ # replaces all older transaction IDs. Similarly, if
+ # delta_after1 contains oid_int, we should not look at
+ # checkpoints[1]. Also, when both checkpoints are set to the
+ # same transaction ID, we don't need to ask for the same key
+ # twice.
+
+ tid_int = self.delta_after0.get(oid_int)
+ if tid_int:
+ # This object changed after checkpoint0, so
+ # there is only one place to look for its state.
+ cachekey = '%s:state:%d:%d' % (prefix, tid_int, oid_int)
+ for client in self.clients_local_first:
+ cache_data = client.get(cachekey)
+ if cache_data and len(cache_data) >= 8:
+ # cache hit
+ assert cache_data[:8] == p64(tid_int)
+ return cache_data[8:], tid_int
+ # cache miss
+ state, actual_tid_int = self.adapter.mover.load_current(
cursor, oid_int)
- state = str(state or '')
- if tid_int is not None:
- # cache the result
- to_cache = {}
- tid = p64(tid_int)
- new_cache_data = tid + state
- if new_cache_data != cache_data:
- to_cache[state_key] = new_cache_data
- if prev_polled_tid and prev_polled_tid != tid_int:
- to_cache[backptr_key] = tid
- if to_cache:
- client.set_multi(to_cache)
+ assert actual_tid_int == tid_int
+ for client in self.clients_local_first:
+ client.set(cachekey, '%s%s' % (p64(tid_int), state or ''))
+ return state, tid_int
+ # Make a list of cache keys to query. The list will have either
+ # 1 or 2 keys.
+ cp0, cp1 = self.checkpoints
+ cachekeys = []
+ cp0_key = '%s:state:%d:%d' % (prefix, cp0, oid_int)
+ cachekeys.append(cp0_key)
+ da1_key = None
+ cp1_key = None
+ tid_int = self.delta_after1.get(oid_int)
+ if tid_int:
+ da1_key = '%s:state:%d:%d' % (prefix, tid_int, oid_int)
+ cachekeys.append(da1_key)
+ elif cp1 != cp0:
+ cp1_key = '%s:state:%d:%d' % (prefix, cp1, oid_int)
+ cachekeys.append(cp1_key)
+
+ for client in self.clients_local_first:
+ # Query the cache. Query multiple keys simultaneously to
+ # minimize latency.
+ response = client.get_multi(cachekeys)
+ if response:
+ cache_data = response.get(cp0_key)
+ if cache_data and len(cache_data) >= 8:
+ # cache hit on the preferred cache key
+ return cache_data[8:], u64(cache_data[:8])
+
+ if da1_key:
+ cache_data = response.get(da1_key)
+ elif cp1_key:
+ cache_data = response.get(cp1_key)
+ if cache_data and len(cache_data) >= 8:
+ # cache hit, but copy the state to
+ # the currently preferred key.
+ client.set(cp0_key, cache_data)
+ return cache_data[8:], u64(cache_data[:8])
+
+ # cache miss
+ state, tid_int = self.adapter.mover.load_current(cursor, oid_int)
+ if tid_int:
+ client.set(cp0_key, '%s%s' % (p64(tid_int), state or ''))
return state, tid_int
+
def tpc_begin(self):
"""Prepare temp space for objects to cache."""
self.queue = AutoTemporaryFile()
@@ -120,14 +221,15 @@
endpos = queue.tell()
self.queue_contents[oid_int] = (startpos, endpos)
- def tpc_vote(self, tid):
- """Now that the tid is chosen, send queued objects to the cache.
- """
- client = self.client
- assert len(tid) == 8
+ def send_queue(self, tid):
+ """Now that this tid is known, send all queued objects to the cache"""
+ tid_int = u64(tid)
send_size = 0
to_send = {}
+ prefix = self.prefix
+ # Order the queue by file position, which should help if the
+ # file is large and needs to be read sequentially from disk.
items = [
(startpos, endpos, oid_int)
for (oid_int, (startpos, endpos)) in self.queue_contents.items()
@@ -140,35 +242,70 @@
state = self.queue.read(length)
if len(state) != length:
raise AssertionError("Queued cache data is truncated")
- cachekey = '%s:state:%d' % (self.prefix, oid_int)
- to_send[cachekey] = '%s%s' % (tid, state)
- send_size += length + len(cachekey)
- if send_size >= self.send_limit:
- client.set_multi(to_send)
+ cachekey = '%s:state:%d:%d' % (prefix, tid_int, oid_int)
+ item_size = length + len(cachekey)
+ if send_size and send_size + item_size >= self.send_limit:
+ for client in self.clients_local_first:
+ client.set_multi(to_send)
to_send.clear()
send_size = 0
+ to_send[cachekey] = '%s%s' % (tid, state)
+ send_size += item_size
if to_send:
- client.set_multi(to_send)
+ for client in self.clients_local_first:
+ client.set_multi(to_send)
self.queue_contents.clear()
self.queue.seek(0)
+ def after_tpc_finish(self, tid):
+ """Update the commit count in the cache.
- def tpc_finish(self):
- """Update the commit count in the cache."""
- client = self.client
+ This is called after the database commit lock is released,
+ but before releasing the storage lock that will allow other
+ threads to use this instance.
+ """
+ tid_int = u64(tid)
+
+ # Why do we cache a commit count instead of the transaction ID?
+ # Here's why. This method gets called after the commit lock is
+ # released; other threads or processes could have committed
+ # more transactions in the time that has passed since releasing
+ # the lock, so a cached transaction ID would cause a race. It
+ # also wouldn't work to cache the transaction ID before
+ # releasing the commit lock, since that could cause some
+ # threads or processes watching the cache for changes to think
+ # they are up to date when they are not. The commit count
+ # solves these problems by ensuring that every commit is
+ # followed by a change to the cache that does not conflict with
+ # concurrent committers.
cachekey = self.commit_count_key
- if client.incr(cachekey) is None:
- # Use the current time as an initial commit_count value.
- client.add(cachekey, int(time.time()))
- # A concurrent committer could have won the race to set the
- # initial commit_count. Increment commit_count so that it
- # doesn't matter who won.
- client.incr(cachekey)
+ for client in self.clients_global_first:
+ if client.incr(cachekey) is None:
+ # Initialize commit_count.
+ # Use a random number for the base.
+ client.add(cachekey, random.randint(1, 1<<31))
+ # A concurrent committer could have won the race to set the
+ # initial commit_count. Increment commit_count so that it
+ # doesn't matter who won.
+ if client.incr(cachekey) is not None:
+ break
+ # else the client is dead. Fall back to the next client.
+ if self.checkpoints:
+ for oid_int in self.queue_contents:
+ # Future cache lookups for oid_int should now use
+ # the tid just committed.
+ self.delta_after0[oid_int] = tid_int
+
+ self.send_queue(tid)
+
def clear_temp(self):
- """Clear any transactional data. Called after txn finish or abort."""
+ """Discard all transaction-specific temporary data.
+
+ Called after transaction finish or abort.
+ """
self.queue_contents = None
if self.queue is not None:
self.queue.close()
@@ -176,8 +313,296 @@
def need_poll(self):
"""Return True if the commit count has changed"""
- new_commit_count = self.client.get(self.commit_count_key)
- if new_commit_count != self.polled_commit_count:
- self.polled_commit_count = new_commit_count
+ for client in self.clients_global_first:
+ new_commit_count = client.get(self.commit_count_key)
+ if new_commit_count is not None:
+ break
+ if new_commit_count != self.commit_count:
+ self.commit_count = new_commit_count
return True
return False
+
+ def after_poll(self, cursor, prev_tid_int, new_tid_int, changes):
+ """Update checkpoint data after a database poll.
+
+ cursor is connected to a load connection.
+
+ changes lists all [(oid_int, tid_int)] changed after
+ prev_tid_int, up to and including new_tid_int, excluding the
+ changes last committed by the associated storage instance.
+ changes can be None to indicate too many objects changed
+ to list them all.
+
+ prev_tid_int can be None, in which case the changes
+ parameter will be ignored. new_tid_int can not be None.
+ """
+ new_checkpoints = None
+ for client in self.clients_global_first:
+ s = client.get(self.checkpoints_key)
+ if s:
+ try:
+ c0, c1 = s.split()
+ c0 = int(c0)
+ c1 = int(c1)
+ except ValueError:
+ # Invalid checkpoint cache value; ignore it.
+ pass
+ else:
+ if c0 >= c1:
+ new_checkpoints = (c0, c1)
+ break
+
+ if not new_checkpoints:
+ new_checkpoints = (new_tid_int, new_tid_int)
+
+ if not self.checkpoints:
+ # Initialize the checkpoints.
+ for client in self.clients_global_first:
+ client.set(
+ self.checkpoints_key, '%d %d' % new_checkpoints)
+ else:
+ # Suggest reinstatement of the former checkpoints, but
+ # use new checkpoints for this instance. Using new
+ # checkpoints ensures that we don't build up
+ # self.delta_after0 in case the cache is offline.
+ for client in self.clients_global_first:
+ client.set(
+ self.checkpoints_key, '%d %d' % self.checkpoints)
+
+ self.checkpoints = new_checkpoints
+ self.delta_after0 = {}
+ self.delta_after1 = {}
+ self.current_tid = new_tid_int
+ return
+
+ cp0, cp1 = new_checkpoints
+ allow_shift = True
+ if cp0 > new_tid_int:
+ # checkpoint0 is in a future that this instance can't
+ # yet see. Ignore the checkpoint change for now.
+ new_checkpoints = self.checkpoints
+ allow_shift = False
+
+ if (new_checkpoints == self.checkpoints
+ and changes is not None
+ and prev_tid_int
+ and prev_tid_int <= self.current_tid
+ and new_tid_int >= self.current_tid
+ ):
+ # Keep the checkpoints and update self.delta_after0.
+ m = self.delta_after0
+ m_get = m.get
+ for oid_int, tid_int in changes:
+ my_tid_int = m_get(oid_int)
+ if my_tid_int is None or tid_int > my_tid_int:
+ m[oid_int] = tid_int
+ self.current_tid = new_tid_int
+ else:
+ # Use the checkpoints specified by the cache.
+ # Rebuild delta_after0 and delta_after1.
+ new_delta_after0 = {}
+ new_delta_after1 = {}
+ # poller.list_changes provides an iterator of (oid, tid) where
+ # tid > after_tid and tid <= last_tid.
+ changes = self.adapter.poller.list_changes(
+ cursor, cp1, new_tid_int)
+ for oid_int, tid_int in changes:
+ if tid_int > cp0:
+ new_delta_after0[oid_int] = tid_int
+ elif tid_int > cp1:
+ new_delta_after1[oid_int] = tid_int
+ self.checkpoints = new_checkpoints
+ self.delta_after0 = new_delta_after0
+ self.delta_after1 = new_delta_after1
+ self.current_tid = new_tid_int
+
+ if allow_shift and len(self.delta_after0) >= self.delta_size_limit:
+ # delta_after0 has reached its limit. The way to
+ # shrink it is to shift the checkpoints. Suggest
+ # shifted checkpoints for future polls.
+ self._suggest_shifted_checkpoints(new_tid_int)
+
+
+ def _suggest_shifted_checkpoints(self, tid_int):
+ """Suggest that future polls use a new pair of checkpoints.
+
+ This does nothing if another instance has already shifted
+ the checkpoints.
+
+ checkpoint0 shifts to checkpoint1 and the tid just committed
+ becomes checkpoint0.
+ """
+ cp0, cp1 = self.checkpoints
+ assert tid_int > cp0
+ expect = '%d %d' % self.checkpoints
+ change_to = '%d %d' % (tid_int, cp0)
+ for client in self.clients_global_first:
+ old_value = client.get(self.checkpoints_key)
+ if old_value:
+ break
+ if not old_value or old_value == expect:
+ # Shift the checkpoints.
+ # Although this is a race with other instances, the race
+ # should not matter.
+ for client in self.clients_global_first:
+ client.set(self.checkpoints_key, change_to)
+ # The poll code will later see the new checkpoints
+ # and update self.checkpoints and self.delta_after(0|1).
+
+
+class SizeOverflow(Exception):
+ """Too much memory would be consumed by a new key"""
+
+class LocalClientBucket(dict):
+ """A map that keeps a record of its approx. size.
+
+ keys must be strings and most values are strings.
+ """
+
+ def __init__(self, limit):
+ self.size = 0
+ self.limit = limit
+ self._super = super(LocalClientBucket, self)
+
+ def __setitem__(self, key, value):
+ """Set an item.
+
+ Throws SizeOverflow if the new item would cause this map to
+ surpass its memory limit.
+ """
+ if isinstance(value, basestring):
+ sizedelta = len(value)
+ else:
+ sizedelta = 0
+ if key in self:
+ oldvalue = self[key]
+ if isinstance(oldvalue, basestring):
+ sizedelta -= len(oldvalue)
+ else:
+ sizedelta += len(key)
+ if self.size + sizedelta > self.limit:
+ raise SizeOverflow()
+ self._super.__setitem__(key, value)
+ self.size += sizedelta
+ return True
+
+ def __delitem__(self, key):
+ oldvalue = self[key]
+ self._super.__delitem__(key)
+ sizedelta = len(key)
+ if isinstance(oldvalue, basestring):
+ sizedelta += len(oldvalue)
+ self.size -= sizedelta
+
+
+class LocalClient(object):
+ """A memcache-like object that stores in dictionaries"""
+
+ def __init__(self, options):
+ self._lock = threading.Lock()
+ self._lock_acquire = self._lock.acquire
+ self._lock_release = self._lock.release
+ self._bucket_limit = 1000000 * options.cache_local_mb / 2
+ self._value_limit = self._bucket_limit / 10
+ self._bucket0 = LocalClientBucket(self._bucket_limit)
+ self._bucket1 = LocalClientBucket(self._bucket_limit)
+
+ def flush_all(self):
+ self._lock_acquire()
+ try:
+ self._bucket0 = LocalClientBucket(self._bucket_limit)
+ self._bucket1 = LocalClientBucket(self._bucket_limit)
+ finally:
+ self._lock_release()
+
+ def get(self, key):
+ self._lock_acquire()
+ try:
+ value = self._bucket0.get(key)
+ if value is None:
+ value = self._bucket1.get(key)
+ if value is None:
+ return None
+ # This key is active, so move it to bucket0.
+ del self._bucket1[key]
+ self._set_one(key, value)
+ return value
+ finally:
+ self._lock_release()
+
+ def get_multi(self, keys):
+ res = {}
+ self._lock_acquire()
+ try:
+ for key in keys:
+ value = self._bucket0.get(key)
+ if value is None:
+ value = self._bucket1.get(key)
+ if value is None:
+ continue
+ # This key is active, so move it to bucket0.
+ del self._bucket1[key]
+ self._set_one(key, value)
+ res[key] = value
+ finally:
+ self._lock_release()
+ return res
+
+ def _set_one(self, key, value):
+ try:
+ self._bucket0[key] = value
+ except SizeOverflow:
+ # shift bucket0 to bucket1
+ self._bucket1 = self._bucket0
+ self._bucket0 = LocalClientBucket(self._bucket_limit)
+ self._bucket0[key] = value
+
+ def set(self, key, value):
+ self.set_multi({key: value})
+
+ def set_multi(self, d, allow_replace=True):
+ res = {}
+ self._lock_acquire()
+ try:
+ for key, value in d.iteritems():
+ if isinstance(value, basestring):
+ if len(value) >= self._value_limit:
+ # This value is too big, so don't cache it.
+ continue
+
+ if key in self._bucket0:
+ if not allow_replace:
+ continue
+ del self._bucket0[key]
+
+ if key in self._bucket1:
+ if not allow_replace:
+ continue
+ del self._bucket1[key]
+
+ self._set_one(key, value)
+ res[key] = value
+
+ finally:
+ self._lock_release()
+ return res
+
+ def add(self, key, value):
+ self.set_multi({key: value}, allow_replace=False)
+
+ def incr(self, key):
+ self._lock_acquire()
+ try:
+ value = self._bucket0.get(key)
+ if value is None:
+ value = self._bucket1.get(key)
+ if value is None:
+ return None
+ # this key is active, so move it to bucket0
+ del self._bucket1[key]
+ res = int(value) + 1
+ self._set_one(key, res)
+ return res
+ finally:
+ self._lock_release()
+
Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py 2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/options.py 2009-10-17 23:19:15 UTC (rev 105119)
@@ -39,6 +39,8 @@
self.cache_servers = () # ['127.0.0.1:11211']
self.cache_module_name = 'memcache'
self.cache_prefix = ''
+ self.cache_local_mb = 4
+ self.cache_delta_size_limit = 10000
for key, value in kwoptions.iteritems():
if key in self.__dict__:
Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py 2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/storage.py 2009-10-17 23:19:15 UTC (rev 105119)
@@ -139,7 +139,7 @@
def __init__(self, adapter, name=None, create=True,
- options=None, **kwoptions):
+ options=None, cache=None, **kwoptions):
self._adapter = adapter
if options is None:
@@ -177,8 +177,10 @@
# but not yet used.
self._preallocated_oids = []
- if options.cache_servers:
- self._cache = StorageCache(options)
+ if cache is not None:
+ self._cache = cache
+ else:
+ self._cache = StorageCache(adapter, options)
if options.blob_dir:
from ZODB.blob import FilesystemHelper
@@ -270,16 +272,12 @@
"""
self._adapter.schema.zap_all()
self._rollback_load_connection()
- cache = self._cache
- if cache is not None:
- cache.flush_all()
+ self._cache.flush_all()
def clear_cache(self):
- """Clear all data from memcached. Used by speed tests.
+ """Clear all data from storage caches. Used by speed tests.
"""
- cache = self._cache
- if cache is not None:
- cache.flush_all()
+ self._cache.flush_all()
def release(self):
"""Release back end database sessions used by this storage instance.
@@ -311,8 +309,9 @@
See ZODB.interfaces.IMVCCStorage.
"""
adapter = self._adapter.new_instance()
+ cache = self._cache.new_instance()
other = RelStorage(adapter=adapter, name=self.__name__,
- create=False, options=self._options)
+ create=False, options=self._options, cache=cache)
self._instances.append(weakref.ref(other))
return other
@@ -421,14 +420,7 @@
if not self._load_transaction_open:
self._restart_load()
cursor = self._load_cursor
-
- if cache is None:
- state, tid_int = self._adapter.mover.load_current(
- cursor, oid_int)
- else:
- state, tid_int = cache.load(
- cursor, oid_int, self._prev_polled_tid, self._adapter)
-
+ state, tid_int = cache.load(cursor, oid_int)
finally:
self._lock_release()
@@ -531,6 +523,7 @@
assert self._prepared_txn is None
adapter = self._adapter
+ cache = self._cache
cursor = self._store_cursor
assert cursor is not None
oid_int = u64(oid)
@@ -545,9 +538,7 @@
# save the data in a temporary table
adapter.mover.store_temp(
cursor, self._batcher, oid_int, prev_tid_int, data)
- cache = self._cache
- if cache is not None:
- cache.store_temp(oid_int, data)
+ cache.store_temp(oid_int, data)
return None
finally:
self._lock_release()
@@ -608,11 +599,9 @@
self._restart_store()
adapter = self._adapter
+ self._cache.tpc_begin()
self._batcher = self._adapter.mover.make_batcher(
self._store_cursor)
- cache = self._cache
- if cache is not None:
- cache.tpc_begin()
if tid is not None:
# hold the commit lock and add the transaction now
@@ -676,9 +665,7 @@
self._max_stored_oid = 0
self._batcher = None
self._txn_blobs = None
- cache = self._cache
- if cache is not None:
- cache.clear_temp()
+ self._cache.clear_temp()
def _finish_store(self):
@@ -791,10 +778,6 @@
ZODB.blob.rename_or_copy_blob(sourcename, targetname)
self._txn_blobs[oid] = targetname
- cache = self._cache
- if cache is not None:
- cache.tpc_vote(self._tid)
-
return serials
@@ -828,9 +811,10 @@
self._adapter.txncontrol.commit_phase2(
self._store_conn, self._store_cursor, txn)
self._adapter.locker.release_commit_lock(self._store_cursor)
- cache = self._cache
- if cache is not None:
- cache.tpc_finish()
+ self._cache.after_tpc_finish(self._tid)
+
+ # N.B. only set _ltid after the commit succeeds,
+ # including cache updates.
self._ltid = self._tid
#if self._txn_blobs and not self._adapter.keep_history:
@@ -1180,12 +1164,10 @@
"""Return true if polling is needed"""
now = time.time()
- cache = self._cache
- if cache is not None:
- if cache.need_poll():
- # There is new data ready to poll
- self._poll_at = now
- return True
+ if self._cache.need_poll():
+ # There is new data ready to poll
+ self._poll_at = now
+ return True
if not self._load_transaction_open:
# Since the load connection is closed or does not have
@@ -1227,10 +1209,10 @@
# get a list of changed OIDs and the most recent tid
poll = self._adapter.poller.poll_invalidations
+ prev = self._prev_polled_tid
try:
- oid_ints, new_polled_tid = poll(
- self._load_conn, self._load_cursor,
- self._prev_polled_tid, ignore_tid)
+ changes, new_polled_tid = poll(
+ self._load_conn, self._load_cursor, prev, ignore_tid)
except self._adapter.connmanager.disconnected_exceptions, e:
log.warning("Reconnecting load_conn: %s", e)
self._drop_load_connection()
@@ -1240,17 +1222,19 @@
log.exception("Reconnect failed.")
raise
log.info("Reconnected.")
- oid_ints, new_polled_tid = poll(
- self._load_conn, self._load_cursor,
- self._prev_polled_tid, ignore_tid)
+ changes, new_polled_tid = poll(
+ self._load_conn, self._load_cursor, prev, ignore_tid)
+ self._cache.after_poll(
+ self._load_cursor, prev, new_polled_tid, changes)
+
self._prev_polled_tid = new_polled_tid
- if oid_ints is None:
+ if changes is None:
oids = None
else:
oids = {}
- for oid_int in oid_ints:
+ for oid_int, tid_int in changes:
oids[p64(oid_int)] = 1
return oids
finally:
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2009-10-17 23:19:15 UTC (rev 105119)
@@ -232,31 +232,33 @@
db = DB(self._storage)
try:
+ fakecache.data.clear()
c1 = db.open()
- self.assert_(c1._storage._cache.client.servers, ['x:1', 'y:2'])
- fakecache.data.clear()
+ self.assert_(c1._storage._cache.clients_global_first[0].servers,
+ ['x:1', 'y:2'])
r1 = c1.root()
# the root state should now be cached
- self.assertEqual(fakecache.data.keys(), ['zzz:state:0'])
+ self.assertEqual(len(fakecache.data), 2)
+ self.assertTrue('zzz:checkpoints' in fakecache.data)
+ self.assertEqual(sorted(fakecache.data.keys())[1][:10],
+ 'zzz:state:')
r1['alpha'] = PersistentMapping()
- self.assertFalse('zzz:commit_count' in fakecache.data)
+ self.assertFalse('zzz:commits' in fakecache.data)
transaction.commit()
- self.assertTrue('zzz:commit_count' in fakecache.data)
- self.assertEqual(sorted(fakecache.data.keys()),
- ['zzz:commit_count', 'zzz:state:0', 'zzz:state:1'])
+ self.assertTrue('zzz:commits' in fakecache.data)
+ self.assertEqual(len(fakecache.data.keys()), 5)
oid = r1['alpha']._p_oid
got, serial = c1._storage.load(oid, '')
# another state should now be cached
- self.assertEqual(len(fakecache.data.keys()), 3)
+ self.assertEqual(len(fakecache.data.keys()), 5)
# make a change
r1['beta'] = 0
transaction.commit()
+ self.assertEqual(len(fakecache.data.keys()), 6)
got, serial = c1._storage.load(oid, '')
- # a backpointer should now be cached
- self.assertEqual(len(fakecache.data.keys()), 4)
# try to load an object that doesn't exist
self.assertRaises(KeyError, c1._storage.load, 'bad.oid.', '')
@@ -311,7 +313,7 @@
finally:
db.close()
- def checkPollInterval(self, using_cache=False):
+ def checkPollInterval(self, using_cache=True):
# Verify the poll_interval parameter causes RelStorage to
# delay invalidation polling.
self._storage._options.poll_interval = 3600
@@ -347,6 +349,7 @@
self.assertEqual(r2['alpha'], 2)
self.assertFalse(c2._storage.need_poll())
else:
+ # Now confirm that no poll is needed
self.assertFalse(c2._storage.need_poll())
c2._flush_invalidations()
r2 = c2.root()
@@ -365,10 +368,8 @@
finally:
db.close()
- def checkPollIntervalWithCache(self):
- self._storage._options.cache_servers = 'x:1'
- self._storage._options.cache_module_name = fakecache.__name__
- fakecache.data.clear()
+ def checkPollIntervalWithoutCache(self):
+ self._storage._options.cache_local_mb = 0
self.checkPollInterval(using_cache=True)
def checkDoubleCommitter(self):
Modified: relstorage/trunk/relstorage/tests/test_cache.py
===================================================================
--- relstorage/trunk/relstorage/tests/test_cache.py 2009-10-17 22:58:43 UTC (rev 105118)
+++ relstorage/trunk/relstorage/tests/test_cache.py 2009-10-17 23:19:15 UTC (rev 105119)
@@ -26,103 +26,125 @@
from relstorage.cache import StorageCache
return StorageCache
+ def _makeOne(self):
+ return self.getClass()(MockAdapter(), MockOptionsWithFakeCache())
+
def test_ctor(self):
from relstorage.tests.fakecache import Client
- c = self.getClass()(MockOptions())
- self.assert_(isinstance(c.client, Client))
- self.assertEqual(c.client.servers, ['host:9999'])
+ c = self._makeOne()
+ self.assertEqual(len(c.clients_local_first), 2)
+ self.assertEqual(len(c.clients_global_first), 2)
+ self.assert_(isinstance(c.clients_global_first[0], Client))
+ self.assertEqual(c.clients_global_first[0].servers, ['host:9999'])
self.assertEqual(c.prefix, 'myprefix')
def test_flush_all(self):
from relstorage.tests.fakecache import data
data.clear()
- c = self.getClass()(MockOptions())
+ c = self._makeOne()
data['x'] = '1'
c.flush_all()
self.assert_(not data)
+ self.assertEqual(c.checkpoints, None)
+ self.assertEqual(c.delta_after0, {})
+ self.assertEqual(c.delta_after1, {})
- def test_load_from_current_transaction(self):
+ def test_load_without_checkpoints(self):
+ c = self._makeOne()
+ res = c.load(None, 2)
+ self.assertEqual(res, (None, None))
+
+ def test_load_using_delta_after0_hit(self):
from relstorage.tests.fakecache import data
from ZODB.utils import p64
- c = self.getClass()(MockOptions())
- tid_int = 50
- tid = p64(tid_int)
- data['myprefix:state:2'] = tid + 'STATE'
- state, got_tid_int = c.load(None, 2, tid_int, None)
- self.assertEqual(state, 'STATE')
- self.assertEqual(got_tid_int, tid_int)
+ adapter = MockAdapter()
+ c = self.getClass()(adapter, MockOptionsWithFakeCache())
+ c.checkpoints = (50, 40)
+ c.delta_after0[2] = 55
+ data['myprefix:state:55:2'] = p64(55) + 'abc'
+ res = c.load(None, 2)
+ self.assertEqual(res, ('abc', 55))
- def test_load_from_backptr(self):
+ def test_load_using_delta_after0_miss(self):
from relstorage.tests.fakecache import data
from ZODB.utils import p64
- c = self.getClass()(MockOptions())
- tid_int = 50
- tid = p64(tid_int)
- data['myprefix:state:2'] = tid + 'STATE'
- data['myprefix:back:60:2'] = tid
- state, got_tid_int = c.load(None, 2, 60, None)
- self.assertEqual(state, 'STATE')
- self.assertEqual(got_tid_int, tid_int)
+ adapter = MockAdapter()
+ c = self.getClass()(adapter, MockOptionsWithFakeCache())
+ c.checkpoints = (50, 40)
+ c.delta_after0[2] = 55
+ adapter.mover.data[2] = ('abc', 55)
+ res = c.load(None, 2)
+ self.assertEqual(res, ('abc', 55))
- def test_load_backptr_missing(self):
+ def test_load_using_checkpoint0_hit(self):
from relstorage.tests.fakecache import data
from ZODB.utils import p64
- c = self.getClass()(MockOptions())
- tid_int = 50
- tid = p64(tid_int)
- data['myprefix:state:2'] = tid + 'STATE'
adapter = MockAdapter()
- adapter.mover.data[2] = ('STATE', 50)
- state, got_tid_int = c.load(None, 2, 60, adapter)
- self.assertEqual(state, 'STATE')
- self.assertEqual(got_tid_int, 50)
- self.assertEqual(data, {
- 'myprefix:state:2': tid + 'STATE',
- 'myprefix:back:60:2': tid,
- })
+ c = self.getClass()(adapter, MockOptionsWithFakeCache())
+ c.checkpoints = (50, 40)
+ data['myprefix:state:50:2'] = p64(45) + 'xyz'
+ res = c.load(None, 2)
+ self.assertEqual(res, ('xyz', 45))
- def test_load_state_expired(self):
+ def test_load_using_checkpoint0_miss(self):
from relstorage.tests.fakecache import data
from ZODB.utils import p64
- c = self.getClass()(MockOptions())
- tid_int = 50
- tid = p64(tid_int)
- data['myprefix:state:2'] = tid + 'STATE'
adapter = MockAdapter()
- adapter.mover.data[2] = ('NEWSTATE', 55)
- state, got_tid_int = c.load(None, 2, 60, adapter)
- self.assertEqual(state, 'NEWSTATE')
- self.assertEqual(got_tid_int, 55)
- self.assertEqual(data, {
- 'myprefix:state:2': p64(55) + 'NEWSTATE',
- 'myprefix:back:60:2': p64(55),
- })
+ c = self.getClass()(adapter, MockOptionsWithFakeCache())
+ c.checkpoints = (50, 40)
+ adapter.mover.data[2] = ('xyz', 45)
+ res = c.load(None, 2)
+ self.assertEqual(res, ('xyz', 45))
+ self.assertEqual(data.get('myprefix:state:50:2'), p64(45) + 'xyz')
- def test_load_state_missing(self):
+ def test_load_using_delta_after1_hit(self):
from relstorage.tests.fakecache import data
from ZODB.utils import p64
- c = self.getClass()(MockOptions())
- tid_int = 50
- tid = p64(tid_int)
adapter = MockAdapter()
- adapter.mover.data[2] = ('NEWSTATE', 55)
- state, got_tid_int = c.load(None, 2, 60, adapter)
- self.assertEqual(state, 'NEWSTATE')
- self.assertEqual(got_tid_int, 55)
- self.assertEqual(data, {
- 'myprefix:state:2': p64(55) + 'NEWSTATE',
- 'myprefix:back:60:2': p64(55),
- })
+ c = self.getClass()(adapter, MockOptionsWithFakeCache())
+ c.checkpoints = (50, 40)
+ c.delta_after1[2] = 45
+ data['myprefix:state:45:2'] = p64(45) + 'abc'
+ res = c.load(None, 2)
+ self.assertEqual(res, ('abc', 45))
+ self.assertEqual(data.get('myprefix:state:50:2'), p64(45) + 'abc')
- def test_load_no_object(self):
- c = self.getClass()(MockOptions())
+ def test_load_using_delta_after1_miss(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
adapter = MockAdapter()
- state, got_tid_int = c.load(None, 2, 60, adapter)
- self.assertEqual(state, '')
- self.assertEqual(got_tid_int, None)
+ c = self.getClass()(adapter, MockOptionsWithFakeCache())
+ c.checkpoints = (50, 40)
+ c.delta_after1[2] = 45
+ adapter.mover.data[2] = ('abc', 45)
+ res = c.load(None, 2)
+ self.assertEqual(res, ('abc', 45))
+ self.assertEqual(data.get('myprefix:state:50:2'), p64(45) + 'abc')
+ def test_load_using_checkpoint1_hit(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
+ adapter = MockAdapter()
+ c = self.getClass()(adapter, MockOptionsWithFakeCache())
+ c.checkpoints = (50, 40)
+ data['myprefix:state:40:2'] = p64(35) + '123'
+ res = c.load(None, 2)
+ self.assertEqual(res, ('123', 35))
+ self.assertEqual(data.get('myprefix:state:50:2'), p64(35) + '123')
+
+ def test_load_using_checkpoint1_miss(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
+ adapter = MockAdapter()
+ c = self.getClass()(adapter, MockOptionsWithFakeCache())
+ c.checkpoints = (50, 40)
+ adapter.mover.data[2] = ('123', 35)
+ res = c.load(None, 2)
+ self.assertEqual(res, ('123', 35))
+ self.assertEqual(data.get('myprefix:state:50:2'), p64(35) + '123')
+
def test_store_temp(self):
- c = self.getClass()(MockOptions())
+ c = self._makeOne()
c.tpc_begin()
c.store_temp(2, 'abc')
c.store_temp(1, 'def')
@@ -131,80 +153,335 @@
c.queue.seek(0)
self.assertEqual(c.queue.read(), 'abcdefghi')
- def test_tpc_vote_small(self):
+ def test_send_queue_small(self):
from relstorage.tests.fakecache import data
from ZODB.utils import p64
- c = self.getClass()(MockOptions())
+ c = self._makeOne()
c.tpc_begin()
c.store_temp(2, 'abc')
c.store_temp(3, 'def')
tid = p64(55)
- c.tpc_vote(tid)
+ c.send_queue(tid)
self.assertEqual(data, {
- 'myprefix:state:2': tid + 'abc',
- 'myprefix:state:3': tid + 'def',
+ 'myprefix:state:55:2': tid + 'abc',
+ 'myprefix:state:55:3': tid + 'def',
})
- def test_tpc_vote_large(self):
+ def test_send_queue_large(self):
from relstorage.tests.fakecache import data
from ZODB.utils import p64
- c = self.getClass()(MockOptions())
+ c = self._makeOne()
c.send_limit = 100
c.tpc_begin()
c.store_temp(2, 'abc')
c.store_temp(3, 'def' * 100)
tid = p64(55)
- c.tpc_vote(tid)
+ c.send_queue(tid)
self.assertEqual(data, {
- 'myprefix:state:2': tid + 'abc',
- 'myprefix:state:3': tid + ('def' * 100),
+ 'myprefix:state:55:2': tid + 'abc',
+ 'myprefix:state:55:3': tid + ('def' * 100),
})
- def test_tpc_vote_none(self):
+ def test_send_queue_none(self):
from relstorage.tests.fakecache import data
from ZODB.utils import p64
- c = self.getClass()(MockOptions())
+ c = self._makeOne()
c.tpc_begin()
tid = p64(55)
- c.tpc_vote(tid)
+ c.send_queue(tid)
self.assertEqual(data, {})
- def test_tpc_finish(self):
+ def test_after_tpc_finish(self):
from relstorage.tests.fakecache import data
- c = self.getClass()(MockOptions())
- c.tpc_finish()
- count = data['myprefix:commit_count']
+ from ZODB.utils import p64
+ c = self._makeOne()
+ c.tpc_begin()
+ c.after_tpc_finish(p64(55))
+ count = data['myprefix:commits']
self.assert_(count > 0)
- c.tpc_finish()
- newcount = data['myprefix:commit_count']
+ c.after_tpc_finish(p64(55))
+ newcount = data['myprefix:commits']
self.assert_(newcount == count + 1)
def test_clear_temp(self):
- c = self.getClass()(MockOptions())
+ c = self._makeOne()
c.tpc_begin()
c.clear_temp()
self.assertEqual(c.queue_contents, None)
self.assertEqual(c.queue, None)
def test_need_poll(self):
- c = self.getClass()(MockOptions())
+ from ZODB.utils import p64
+ c = self._makeOne()
self.assertTrue(c.need_poll())
self.assertFalse(c.need_poll())
self.assertFalse(c.need_poll())
- c.tpc_finish()
+ c.tpc_begin()
+ c.after_tpc_finish(p64(55))
self.assertTrue(c.need_poll())
self.assertFalse(c.need_poll())
self.assertFalse(c.need_poll())
+ def test_after_poll_init_checkpoints(self):
+ from relstorage.tests.fakecache import data
+ c = self._makeOne()
+ c.after_poll(None, 40, 50, [])
+ self.assertEqual(c.checkpoints, (50, 50))
+ self.assertEqual(data['myprefix:checkpoints'], '50 50')
+ def test_after_poll_ignore_garbage_checkpoints(self):
+ from relstorage.tests.fakecache import data
+ data['myprefix:checkpoints'] = 'baddata'
+ c = self._makeOne()
+ c.after_poll(None, 40, 50, [])
+ self.assertEqual(c.checkpoints, (50, 50))
+ self.assertEqual(data['myprefix:checkpoints'], '50 50')
+
+ def test_after_poll_ignore_invalid_checkpoints(self):
+ from relstorage.tests.fakecache import data
+ data['myprefix:checkpoints'] = '60 70' # bad: c0 < c1
+ c = self._makeOne()
+ c.after_poll(None, 40, 50, [])
+ self.assertEqual(c.checkpoints, (50, 50))
+ self.assertEqual(data['myprefix:checkpoints'], '50 50')
+
+ def test_after_poll_reinstate_checkpoints(self):
+ from relstorage.tests.fakecache import data
+ c = self._makeOne()
+ c.checkpoints = (40, 30)
+ c.after_poll(None, 40, 50, [])
+ self.assertEqual(c.checkpoints, (50, 50))
+ self.assertEqual(data['myprefix:checkpoints'], '40 30')
+
+ def test_after_poll_future_checkpoints(self):
+ from relstorage.tests.fakecache import data
+ data['myprefix:checkpoints'] = '90 80'
+ c = self._makeOne()
+ c.checkpoints = (40, 30)
+ c.current_tid = 40
+ c.after_poll(None, 40, 50, [(2, 45)])
+ # This instance can't yet see txn 90, so it sticks with
+ # the existing checkpoints.
+ self.assertEqual(c.checkpoints, (40, 30))
+ self.assertEqual(data['myprefix:checkpoints'], '90 80')
+ self.assertEqual(c.delta_after0, {2: 45})
+ self.assertEqual(c.delta_after1, {})
+
+ def test_after_poll_retain_checkpoints(self):
+ from relstorage.tests.fakecache import data
+ data['myprefix:checkpoints'] = '40 30'
+ c = self._makeOne()
+ c.checkpoints = (40, 30)
+ c.current_tid = 40
+ c.delta_after1 = {1: 35}
+ c.after_poll(None, 40, 50, [(2, 45)])
+ self.assertEqual(c.checkpoints, (40, 30))
+ self.assertEqual(data['myprefix:checkpoints'], '40 30')
+ self.assertEqual(c.delta_after0, {2: 45})
+ self.assertEqual(c.delta_after1, {1: 35})
+
+ def test_after_poll_new_checkpoints(self):
+ from relstorage.tests.fakecache import data
+ data['myprefix:checkpoints'] = '50 40'
+ adapter = MockAdapter()
+ c = self.getClass()(adapter, MockOptionsWithFakeCache())
+ adapter.poller.changes = [(3, 42), (1, 35), (2, 45)]
+ c.checkpoints = (40, 30)
+ c.current_tid = 40
+ c.after_poll(None, 40, 50, [(2, 45)])
+ self.assertEqual(c.checkpoints, (50, 40))
+ self.assertEqual(data['myprefix:checkpoints'], '50 40')
+ self.assertEqual(c.delta_after0, {})
+ self.assertEqual(c.delta_after1, {2: 45, 3: 42})
+
+ def test_after_poll_gap(self):
+ from relstorage.tests.fakecache import data
+ data['myprefix:checkpoints'] = '40 30'
+ adapter = MockAdapter()
+ c = self.getClass()(adapter, MockOptionsWithFakeCache())
+ adapter.poller.changes = [(3, 42), (1, 35), (2, 45)]
+ c.checkpoints = (40, 30)
+ c.current_tid = 40
+ # provide a prev_tid_int that shows a gap in the polled
+ # transaction list, forcing a rebuild of delta_after(0|1).
+ c.after_poll(None, 43, 50, [(2, 45)])
+ self.assertEqual(c.checkpoints, (40, 30))
+ self.assertEqual(data['myprefix:checkpoints'], '40 30')
+ self.assertEqual(c.delta_after0, {2: 45, 3: 42})
+ self.assertEqual(c.delta_after1, {1: 35})
+
+ def test_after_poll_shift_checkpoints(self):
+ from relstorage.tests.fakecache import data
+ data['myprefix:checkpoints'] = '40 30'
+ c = self._makeOne()
+ c.delta_size_limit = 2
+ c.checkpoints = (40, 30)
+ c.current_tid = 40
+ c.after_poll(None, 40, 314, [(1, 45), (2, 46)])
+ self.assertEqual(c.checkpoints, (40, 30))
+ self.assertEqual(data['myprefix:checkpoints'], '314 40')
+ self.assertEqual(c.delta_after0, {1: 45, 2: 46})
+ self.assertEqual(c.delta_after1, {})
+
+
+class LocalClientBucketTests(unittest.TestCase):
+
+ def getClass(self):
+ from relstorage.cache import LocalClientBucket
+ return LocalClientBucket
+
+ def test_set_string_value(self):
+ b = self.getClass()(100)
+ self.assertEqual(b.size, 0)
+ b['abc'] = 'defghi'
+ self.assertEqual(b.size, 9)
+ b['abc'] = '123'
+ self.assertEqual(b.size, 6)
+ b['abc'] = ''
+ self.assertEqual(b.size, 3)
+ b['abc'] = 'defghi'
+ self.assertEqual(b.size, 9)
+ del b['abc']
+ self.assertEqual(b.size, 0)
+
+ def test_set_integer_value(self):
+ b = self.getClass()(100)
+ self.assertEqual(b.size, 0)
+ b['abc'] = 5
+ self.assertEqual(b.size, 3)
+ b['abc'] = -7
+ self.assertEqual(b.size, 3)
+ b['abc'] = 0
+ self.assertEqual(b.size, 3)
+ del b['abc']
+ self.assertEqual(b.size, 0)
+
+ def test_set_limit(self):
+ from relstorage.cache import SizeOverflow
+ b = self.getClass()(5)
+ self.assertEqual(b.size, 0)
+ b['abc'] = 'xy'
+ self.assertEqual(b.size, 5)
+ b['abc'] = 'z'
+ self.assertEqual(b.size, 4)
+ self.assertRaises(SizeOverflow, b.__setitem__, 'abc', 'xyz')
+ self.assertEqual(b['abc'], 'z')
+
+
+class LocalClientTests(unittest.TestCase):
+
+ def getClass(self):
+ from relstorage.cache import LocalClient
+ return LocalClient
+
+ def _makeOne(self):
+ return self.getClass()(MockOptions())
+
+ def test_ctor(self):
+ c = self._makeOne()
+ self.assertEqual(c._bucket_limit, 500000)
+ self.assertEqual(c._value_limit, 50000)
+
+ def test_set_and_get(self):
+ c = self._makeOne()
+ c.set('abc', 'def')
+ self.assertEqual(c.get('abc'), 'def')
+ self.assertEqual(c.get('xyz'), None)
+
+ def test_set_multi_and_get_multi(self):
+ c = self._makeOne()
+ c.set_multi({'k0': 'abc', 'k1': 'def'})
+ self.assertEqual(c.get_multi(['k0', 'k1']), {'k0': 'abc', 'k1': 'def'})
+ self.assertEqual(c.get_multi(['k0', 'k2']), {'k0': 'abc'})
+ self.assertEqual(c.get_multi(['k2', 'k3']), {})
+
+ def test_lru(self):
+ # LocalClient is a simple LRU cache. Confirm it keeps the right keys.
+ c = self._makeOne()
+ c._bucket_limit = 51
+ c.flush_all()
+ for i in range(5):
+ # add 10 bytes
+ c.set('k%d' % i, '01234567')
+ self.assertEqual(c._bucket0.size, 50)
+ self.assertEqual(c._bucket1.size, 0)
+ c.set('k5', '01234567')
+ self.assertEqual(c._bucket0.size, 10)
+ self.assertEqual(c._bucket1.size, 50)
+ v = c.get('k2')
+ self.assertEqual(v, '01234567')
+ self.assertEqual(c._bucket0.size, 20)
+ self.assertEqual(c._bucket1.size, 40)
+ for i in range(5):
+ # add 10 bytes
+ c.set('x%d' % i, '01234567')
+ self.assertEqual(c._bucket0.size, 20)
+ self.assertEqual(c._bucket1.size, 50)
+ self.assertEqual(c.get('x0'), '01234567')
+ self.assertEqual(c.get('x1'), '01234567')
+ self.assertEqual(c.get('x2'), '01234567')
+ self.assertEqual(c.get('x3'), '01234567')
+ self.assertEqual(c.get('x4'), '01234567')
+ self.assertEqual(c._bucket0.size, 50)
+ self.assertEqual(c._bucket1.size, 20)
+ self.assertEqual(c.get('k0'), None)
+ self.assertEqual(c.get('k1'), None)
+ self.assertEqual(c.get('k2'), '01234567')
+ self.assertEqual(c.get('k3'), None)
+ self.assertEqual(c.get('k4'), None)
+ self.assertEqual(c.get('k5'), None)
+
+ self.assertEqual(c._bucket0.size, 10)
+ self.assertEqual(c._bucket1.size, 50)
+
+ c.set('z0', '01234567')
+ self.assertEqual(c._bucket0.size, 20)
+ self.assertEqual(c._bucket1.size, 50)
+
+ def test_add(self):
+ c = self._makeOne()
+ c.set('k0', 'abc')
+ c.add('k0', 'def')
+ c.add('k1', 'ghi')
+ self.assertEqual(c.get_multi(['k0', 'k1']), {'k0': 'abc', 'k1': 'ghi'})
+
+ def test_incr_normal(self):
+ c = self._makeOne()
+ c.set('k0', 41)
+ self.assertEqual(c.incr('k0'), 42)
+ self.assertEqual(c.incr('k1'), None)
+
+ def test_incr_hit_size_limit(self):
+ c = self._makeOne()
+ c._bucket_limit = 4
+ c.flush_all()
+ c.set('k0', 14)
+ c.set('key1', 27)
+ self.assertEqual(c._bucket0.size, 4)
+ self.assertEqual(c._bucket1.size, 2)
+ self.assertEqual(c.incr('k0'), 15) # this moves k0 to bucket0
+ self.assertEqual(c._bucket0.size, 2)
+ self.assertEqual(c._bucket1.size, 4)
+
+
class MockOptions:
+ cache_module_name = ''
+ cache_servers = ''
+ cache_prefix = ''
+ cache_local_mb = 1
+ cache_delta_size_limit = 10000
+
+class MockOptionsWithFakeCache:
cache_module_name = 'relstorage.tests.fakecache'
cache_servers = 'host:9999'
cache_prefix = 'myprefix'
+ cache_local_mb = 1
+ cache_delta_size_limit = 10000
class MockAdapter:
def __init__(self):
self.mover = MockObjectMover()
+ self.poller = MockPoller()
class MockObjectMover:
def __init__(self):
@@ -212,7 +489,16 @@
def load_current(self, cursor, oid_int):
return self.data.get(oid_int, (None, None))
+class MockPoller:
+ def __init__(self):
+ self.changes = [] # [(oid, tid)]
+ def list_changes(self, cursor, after_tid, last_tid):
+ return ((oid, tid) for (oid, tid) in self.changes
+ if tid > after_tid and tid <= last_tid)
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(StorageCacheTests))
+ suite.addTest(unittest.makeSuite(LocalClientBucketTests))
+ suite.addTest(unittest.makeSuite(LocalClientTests))
return suite
More information about the checkins
mailing list