[Checkins] SVN: relstorage/trunk/ When both poll-interval and cache-servers are set, we now poll the cache
Shane Hathaway
shane at hathawaymix.org
Mon Jan 26 18:24:16 EST 2009
Log message for revision 95056:
When both poll-interval and cache-servers are set, we now poll the cache
primarily, falling back to polling the database after poll-interval has
passed since the last poll. This means we can now use a large
poll-interval while keeping all clients up to date.
Also increased test coverage.
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/relstorage/component.xml
U relstorage/trunk/relstorage/relstorage.py
U relstorage/trunk/relstorage/tests/fakecache.py
U relstorage/trunk/relstorage/tests/reltestbase.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2009-01-26 21:40:49 UTC (rev 95055)
+++ relstorage/trunk/CHANGES.txt 2009-01-26 23:24:15 UTC (rev 95056)
@@ -1,5 +1,10 @@
Next Release
+- When both cache-servers and poll-interval are set, we now poll the
+ cache for changes on every request. This makes it possible to use
+ a high poll-interval to reduce the database polling burden, yet
+ every client can see changes immediately.
+
- Added the pack-dry-run option, which causes pack operations to only
populate the pack tables with the list of objects and states to pack,
but not actually pack.
Modified: relstorage/trunk/relstorage/component.xml
===================================================================
--- relstorage/trunk/relstorage/component.xml 2009-01-26 21:40:49 UTC (rev 95055)
+++ relstorage/trunk/relstorage/component.xml 2009-01-26 23:24:15 UTC (rev 95056)
@@ -26,18 +26,26 @@
</key>
<key name="poll-interval" datatype="float" required="no">
<description>
- Defer polling the database for the specified maximum time interval.
- Set to 0 (the default) to always poll. Fractional seconds are
- allowed.
+ Defer polling the database for the specified maximum time interval,
+ in seconds. Set to 0 (the default) to always poll. Fractional
+ seconds are allowed. Use this to lighten the database load on
+ servers with high read volume and low write volume.
- Use this to lighten the database load on servers with high read
- volume and low write volume. A setting of 1-5 seconds is sufficient
- for most systems.
+ The poll-interval option works best in conjunction with
+ the cache-servers option. If both are enabled, RelStorage will
+ poll a single cache key for changes on every request.
+ The database will not be polled unless the cache indicates
+ there have been changes, or the timeout specified by poll-interval
+ has expired. This configuration keeps clients fully up to date,
+ while removing much of the polling burden from the database.
+ A good cluster configuration is to use memcache servers
+ and a high poll-interval (say, 60 seconds).
- While this setting should not affect database integrity,
- it increases the probability of basing transactions on stale data,
- leading to conflicts. Thus a nonzero setting can hurt
- the performance of servers with high write volume.
+ This option can be used without the cache-servers option,
+ but a large poll-interval without cache-servers increases the
+ probability of basing transactions on stale data, which does not
+ affect database consistency, but does increase the probability
+ of conflict errors, leading to low performance.
</description>
</key>
<key name="pack-gc" datatype="boolean" default="true">
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2009-01-26 21:40:49 UTC (rev 95055)
+++ relstorage/trunk/relstorage/relstorage.py 2009-01-26 23:24:15 UTC (rev 95056)
@@ -664,6 +664,15 @@
txn = self._prepared_txn
assert txn is not None
self._adapter.commit_phase2(self._store_cursor, txn)
+ cache = self._cache_client
+ if cache is not None:
+ if cache.incr('commit_count') is None:
+ # Use the current time as an initial commit_count value.
+ cache.add('commit_count', int(time.time()))
+ # A concurrent committer could have won the race to set the
+ # initial commit_count. Increment commit_count so that it
+ # doesn't matter who won.
+ cache.incr('commit_count')
self._prepared_txn = None
self._ltid = self._tid
self._tid = None
@@ -901,6 +910,10 @@
options=parent._options)
# _prev_polled_tid contains the tid at the previous poll
self._prev_polled_tid = None
+ # _commit_count contains the last polled value of the
+ # 'commit_count' cache key
+ self._commit_count = 0
+ # _poll_at is the time to poll regardless of commit_count
self._poll_at = 0
def _get_oid_cache_key(self, oid_int):
@@ -925,6 +938,30 @@
finally:
self._lock_release()
+ def need_poll(self):
+ """Return true if polling is needed"""
+ now = time.time()
+
+ cache = self._cache_client
+ if cache is not None:
+ new_commit_count = cache.get('commit_count')
+ if new_commit_count != self._commit_count:
+ # There is new data ready to poll
+ self._commit_count = new_commit_count
+ self._poll_at = now
+ return True
+
+ if not self._load_transaction_open:
+ # Since the load connection is closed or does not have
+ # a transaction in progress, polling is required.
+ return True
+
+ if now >= self._poll_at:
+ # The poll timeout has expired
+ return True
+
+ return False
+
def poll_invalidations(self):
"""Looks for OIDs of objects that changed since _prev_polled_tid
@@ -938,14 +975,10 @@
return {}
if self._options.poll_interval:
- now = time.time()
- if self._load_transaction_open and now < self._poll_at:
- # It's not yet time to poll again. The previous load
- # transaction is still open, so it's safe to
- # ignore this poll.
+ if not self.need_poll():
return {}
- # else poll now after resetting the timeout
- self._poll_at = now + self._options.poll_interval
+ # reset the timeout
+ self._poll_at = time.time() + self._options.poll_interval
self._restart_load()
conn = self._load_conn
Modified: relstorage/trunk/relstorage/tests/fakecache.py
===================================================================
--- relstorage/trunk/relstorage/tests/fakecache.py 2009-01-26 21:40:49 UTC (rev 95055)
+++ relstorage/trunk/relstorage/tests/fakecache.py 2009-01-26 23:24:15 UTC (rev 95056)
@@ -14,14 +14,27 @@
"""A memcache-like module sufficient for testing without an actual memcache.
"""
+data = {}
+
class Client(object):
def __init__(self, servers):
self.servers = servers
- self.data = {}
def get(self, key):
- return self.data.get(key)
+ return data.get(key)
def set(self, key, value):
- self.data[key] = value
+ data[key] = value
+
+ def add(self, key, value):
+ if key not in data:
+ data[key] = value
+
+ def incr(self, key):
+ value = data.get(key)
+ if value is None:
+ return None
+ value = int(value) + 1
+ data[key] = value
+ return value
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2009-01-26 21:40:49 UTC (rev 95055)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2009-01-26 23:24:15 UTC (rev 95056)
@@ -16,6 +16,7 @@
import itertools
import time
from relstorage.relstorage import RelStorage
+from relstorage.tests import fakecache
from ZODB.DB import DB
from ZODB.utils import p64
@@ -42,13 +43,14 @@
def open(self, **kwargs):
adapter = self.make_adapter()
- self._storage = RelStorage(adapter, **kwargs)
+ self._storage = RelStorage(adapter, pack_gc=True, **kwargs)
def setUp(self):
self.open(create=1)
self._storage.zap_all()
def tearDown(self):
+ transaction.abort()
self._storage.close()
self._storage.cleanup()
@@ -231,25 +233,32 @@
def checkLoadFromCache(self):
# Store an object, cache it, then retrieve it from the cache
self._storage._options.cache_servers = 'x:1 y:2'
- self._storage._options.cache_module_name = 'relstorage.tests.fakecache'
+ self._storage._options.cache_module_name = fakecache.__name__
+ fakecache.data.clear()
db = DB(self._storage)
try:
c1 = db.open()
- cache = c1._storage._cache_client
- self.assertEqual(cache.servers, ['x:1', 'y:2'])
- self.assertEqual(len(cache.data), 0)
+ self.assertEqual(c1._storage._cache_client.servers, ['x:1', 'y:2'])
+ self.assertEqual(len(fakecache.data), 0)
r1 = c1.root()
- self.assertEqual(len(cache.data), 2)
+ # the root tid and state should now be cached
+ self.assertEqual(len(fakecache.data), 2)
r1['alpha'] = PersistentMapping()
+ self.assertFalse('commit_count' in fakecache.data)
transaction.commit()
+ self.assertTrue('commit_count' in fakecache.data)
+ self.assertEqual(len(fakecache.data), 3)
oid = r1['alpha']._p_oid
- self.assertEqual(len(cache.data), 2)
- got, serialno = c1._storage.load(oid, '')
- self.assertEqual(len(cache.data), 4)
- # load the object from the cache
- got, serialno = c1._storage.load(oid, '')
+ got, serial = c1._storage.load(oid, '')
+ # another tid and state should now be cached
+ self.assertEqual(len(fakecache.data), 5)
+
+ # load the object via loadSerial()
+ got2 = c1._storage.loadSerial(oid, serial)
+ self.assertEqual(got, got2)
+
# try to load an object that doesn't exist
self.assertRaises(KeyError, c1._storage.load, 'bad.oid.', '')
finally:
@@ -291,7 +300,7 @@
finally:
db.close()
- def checkPollInterval(self):
+ def checkPollInterval(self, using_cache=False):
# Verify the poll_interval parameter causes RelStorage to
# delay invalidation polling.
self._storage._options.poll_interval = 3600
@@ -318,9 +327,18 @@
# flush invalidations to c2, but the poll timer has not
# yet expired, so the change to r2 should not be seen yet.
self.assertTrue(c2._storage._poll_at > 0)
- c2._flush_invalidations()
- r2 = c2.root()
- self.assertEqual(r2['alpha'], 1)
+ if using_cache:
+ # The cache reveals that a poll is needed even though
+ # the poll timeout has not expired.
+ self.assertTrue(c2._storage.need_poll())
+ c2._flush_invalidations()
+ r2 = c2.root()
+ self.assertEqual(r2['alpha'], 2)
+ else:
+ self.assertFalse(c2._storage.need_poll())
+ c2._flush_invalidations()
+ r2 = c2.root()
+ self.assertEqual(r2['alpha'], 1)
# expire the poll timer and verify c2 sees the change
c2._storage._poll_at -= 3601
@@ -335,7 +353,13 @@
finally:
db.close()
+ def checkPollIntervalWithCache(self):
+ self._storage._options.cache_servers = 'x:1'
+ self._storage._options.cache_module_name = fakecache.__name__
+ fakecache.data.clear()
+ self.checkPollInterval(using_cache=True)
+
def checkTransactionalUndoIterator(self):
# this test overrides the broken version in TransactionalUndoStorage.
@@ -452,7 +476,7 @@
finally:
db.close()
- def checkPackGC(self, gc_enabled=True):
+ def checkPackGC(self, expect_object_deleted=True):
db = DB(self._storage)
try:
c1 = db.open()
@@ -473,7 +497,7 @@
packtime = time.time()
self._storage.pack(packtime, referencesf)
- if gc_enabled:
+ if expect_object_deleted:
# The object should now be gone
self.assertRaises(KeyError, self._storage.load, oid, '')
else:
@@ -484,8 +508,12 @@
def checkPackGCDisabled(self):
self._storage._options.pack_gc = False
- self.checkPackGC(gc_enabled=False)
+ self.checkPackGC(expect_object_deleted=False)
+ def checkPackGCDryRun(self):
+ self._storage._options.pack_dry_run = True
+ self.checkPackGC(expect_object_deleted=False)
+
def checkPackOldUnreferenced(self):
db = DB(self._storage)
try:
More information about the Checkins
mailing list