[Checkins] SVN: relstorage/trunk/ When both poll-interval and cache-servers are set, we now poll the cache

Shane Hathaway shane at hathawaymix.org
Mon Jan 26 18:24:16 EST 2009


Log message for revision 95056:
  When both poll-interval and cache-servers are set, we now poll the cache
  primarily, falling back to polling the database after poll-interval has
  passed since the last poll.  This means we can now use a large
  poll-interval while keeping all clients up to date.
  
  Also increased test coverage.
  
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/component.xml
  U   relstorage/trunk/relstorage/relstorage.py
  U   relstorage/trunk/relstorage/tests/fakecache.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2009-01-26 21:40:49 UTC (rev 95055)
+++ relstorage/trunk/CHANGES.txt	2009-01-26 23:24:15 UTC (rev 95056)
@@ -1,5 +1,10 @@
 Next Release
 
+- When both cache-servers and poll-interval are set, we now poll the
+  cache for changes on every request.  This makes it possible to use
+  a high poll-interval to reduce the database polling burden, yet
+  every client can see changes immediately.
+
 - Added the pack-dry-run option, which causes pack operations to only
   populate the pack tables with the list of objects and states to pack,
   but not actually pack.

Modified: relstorage/trunk/relstorage/component.xml
===================================================================
--- relstorage/trunk/relstorage/component.xml	2009-01-26 21:40:49 UTC (rev 95055)
+++ relstorage/trunk/relstorage/component.xml	2009-01-26 23:24:15 UTC (rev 95056)
@@ -26,18 +26,26 @@
     </key>
     <key name="poll-interval" datatype="float" required="no">
       <description>
-        Defer polling the database for the specified maximum time interval.
-        Set to 0 (the default) to always poll.  Fractional seconds are
-        allowed.
+        Defer polling the database for the specified maximum time interval,
+        in seconds.  Set to 0 (the default) to always poll.  Fractional
+        seconds are allowed.  Use this to lighten the database load on
+        servers with high read volume and low write volume.
 
-        Use this to lighten the database load on servers with high read
-        volume and low write volume.  A setting of 1-5 seconds is sufficient
-        for most systems.
+        The poll-interval option works best in conjunction with
+        the cache-servers option.  If both are enabled, RelStorage will
+        poll a single cache key for changes on every request.
+        The database will not be polled unless the cache indicates
+        there have been changes, or the timeout specified by poll-interval
+        has expired.  This configuration keeps clients fully up to date,
+        while removing much of the polling burden from the database.
+        A good cluster configuration is to use memcache servers
+        and a high poll-interval (say, 60 seconds).
 
-        While this setting should not affect database integrity,
-        it increases the probability of basing transactions on stale data,
-        leading to conflicts.  Thus a nonzero setting can hurt
-        the performance of servers with high write volume.
+        This option can be used without the cache-servers option,
+        but a large poll-interval without cache-servers increases the
+        probability of basing transactions on stale data, which does not
+        affect database consistency, but does increase the probability
+        of conflict errors, leading to low performance.
       </description>
     </key>
     <key name="pack-gc" datatype="boolean" default="true">

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2009-01-26 21:40:49 UTC (rev 95055)
+++ relstorage/trunk/relstorage/relstorage.py	2009-01-26 23:24:15 UTC (rev 95056)
@@ -664,6 +664,15 @@
         txn = self._prepared_txn
         assert txn is not None
         self._adapter.commit_phase2(self._store_cursor, txn)
+        cache = self._cache_client
+        if cache is not None:
+            if cache.incr('commit_count') is None:
+                # Use the current time as an initial commit_count value.
+                cache.add('commit_count', int(time.time()))
+                # A concurrent committer could have won the race to set the
+                # initial commit_count.  Increment commit_count so that it
+                # doesn't matter who won.
+                cache.incr('commit_count')
         self._prepared_txn = None
         self._ltid = self._tid
         self._tid = None
@@ -901,6 +910,10 @@
             options=parent._options)
         # _prev_polled_tid contains the tid at the previous poll
         self._prev_polled_tid = None
+        # _commit_count contains the last polled value of the
+        # 'commit_count' cache key
+        self._commit_count = 0
+        # _poll_at is the time to poll regardless of commit_count
         self._poll_at = 0
 
     def _get_oid_cache_key(self, oid_int):
@@ -925,6 +938,30 @@
         finally:
             self._lock_release()
 
+    def need_poll(self):
+        """Return true if polling is needed"""
+        now = time.time()
+
+        cache = self._cache_client
+        if cache is not None:
+            new_commit_count = cache.get('commit_count')
+            if new_commit_count != self._commit_count:
+                # There is new data ready to poll
+                self._commit_count = new_commit_count
+                self._poll_at = now
+                return True
+
+        if not self._load_transaction_open:
+            # Since the load connection is closed or does not have
+            # a transaction in progress, polling is required.
+            return True
+
+        if now >= self._poll_at:
+            # The poll timeout has expired
+            return True
+
+        return False
+
     def poll_invalidations(self):
         """Looks for OIDs of objects that changed since _prev_polled_tid
 
@@ -938,14 +975,10 @@
                 return {}
 
             if self._options.poll_interval:
-                now = time.time()
-                if self._load_transaction_open and now < self._poll_at:
-                    # It's not yet time to poll again.  The previous load
-                    # transaction is still open, so it's safe to
-                    # ignore this poll.
+                if not self.need_poll():
                     return {}
-                # else poll now after resetting the timeout
-                self._poll_at = now + self._options.poll_interval
+                # reset the timeout
+                self._poll_at = time.time() + self._options.poll_interval
 
             self._restart_load()
             conn = self._load_conn

Modified: relstorage/trunk/relstorage/tests/fakecache.py
===================================================================
--- relstorage/trunk/relstorage/tests/fakecache.py	2009-01-26 21:40:49 UTC (rev 95055)
+++ relstorage/trunk/relstorage/tests/fakecache.py	2009-01-26 23:24:15 UTC (rev 95056)
@@ -14,14 +14,27 @@
 """A memcache-like module sufficient for testing without an actual memcache.
 """
 
+data = {}
+
 class Client(object):
 
     def __init__(self, servers):
         self.servers = servers
-        self.data = {}
 
     def get(self, key):
-        return self.data.get(key)
+        return data.get(key)
 
     def set(self, key, value):
-        self.data[key] = value
+        data[key] = value
+
+    def add(self, key, value):
+        if key not in data:
+            data[key] = value
+
+    def incr(self, key):
+        value = data.get(key)
+        if value is None:
+            return None
+        value = int(value) + 1
+        data[key] = value
+        return value

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2009-01-26 21:40:49 UTC (rev 95055)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2009-01-26 23:24:15 UTC (rev 95056)
@@ -16,6 +16,7 @@
 import itertools
 import time
 from relstorage.relstorage import RelStorage
+from relstorage.tests import fakecache
 
 from ZODB.DB import DB
 from ZODB.utils import p64
@@ -42,13 +43,14 @@
 
     def open(self, **kwargs):
         adapter = self.make_adapter()
-        self._storage = RelStorage(adapter, **kwargs)
+        self._storage = RelStorage(adapter, pack_gc=True, **kwargs)
 
     def setUp(self):
         self.open(create=1)
         self._storage.zap_all()
 
     def tearDown(self):
+        transaction.abort()
         self._storage.close()
         self._storage.cleanup()
 
@@ -231,25 +233,32 @@
     def checkLoadFromCache(self):
         # Store an object, cache it, then retrieve it from the cache
         self._storage._options.cache_servers = 'x:1 y:2'
-        self._storage._options.cache_module_name = 'relstorage.tests.fakecache'
+        self._storage._options.cache_module_name = fakecache.__name__
+        fakecache.data.clear()
 
         db = DB(self._storage)
         try:
             c1 = db.open()
-            cache = c1._storage._cache_client
-            self.assertEqual(cache.servers, ['x:1', 'y:2'])
-            self.assertEqual(len(cache.data), 0)
+            self.assertEqual(c1._storage._cache_client.servers, ['x:1', 'y:2'])
+            self.assertEqual(len(fakecache.data), 0)
             r1 = c1.root()
-            self.assertEqual(len(cache.data), 2)
+            # the root tid and state should now be cached
+            self.assertEqual(len(fakecache.data), 2)
             r1['alpha'] = PersistentMapping()
+            self.assertFalse('commit_count' in fakecache.data)
             transaction.commit()
+            self.assertTrue('commit_count' in fakecache.data)
+            self.assertEqual(len(fakecache.data), 3)
             oid = r1['alpha']._p_oid
 
-            self.assertEqual(len(cache.data), 2)
-            got, serialno = c1._storage.load(oid, '')
-            self.assertEqual(len(cache.data), 4)
-            # load the object from the cache
-            got, serialno = c1._storage.load(oid, '')
+            got, serial = c1._storage.load(oid, '')
+            # another tid and state should now be cached
+            self.assertEqual(len(fakecache.data), 5)
+
+            # load the object via loadSerial()
+            got2 = c1._storage.loadSerial(oid, serial)
+            self.assertEqual(got, got2)
+
             # try to load an object that doesn't exist
             self.assertRaises(KeyError, c1._storage.load, 'bad.oid.', '')
         finally:
@@ -291,7 +300,7 @@
         finally:
             db.close()
 
-    def checkPollInterval(self):
+    def checkPollInterval(self, using_cache=False):
         # Verify the poll_interval parameter causes RelStorage to
         # delay invalidation polling.
         self._storage._options.poll_interval = 3600
@@ -318,9 +327,18 @@
             # flush invalidations to c2, but the poll timer has not
             # yet expired, so the change to r2 should not be seen yet.
             self.assertTrue(c2._storage._poll_at > 0)
-            c2._flush_invalidations()
-            r2 = c2.root()
-            self.assertEqual(r2['alpha'], 1)
+            if using_cache:
+                # The cache reveals that a poll is needed even though
+                # the poll timeout has not expired.
+                self.assertTrue(c2._storage.need_poll())
+                c2._flush_invalidations()
+                r2 = c2.root()
+                self.assertEqual(r2['alpha'], 2)
+            else:
+                self.assertFalse(c2._storage.need_poll())
+                c2._flush_invalidations()
+                r2 = c2.root()
+                self.assertEqual(r2['alpha'], 1)
 
             # expire the poll timer and verify c2 sees the change
             c2._storage._poll_at -= 3601
@@ -335,7 +353,13 @@
         finally:
             db.close()
 
+    def checkPollIntervalWithCache(self):
+        self._storage._options.cache_servers = 'x:1'
+        self._storage._options.cache_module_name = fakecache.__name__
+        fakecache.data.clear()
+        self.checkPollInterval(using_cache=True)
 
+
     def checkTransactionalUndoIterator(self):
         # this test overrides the broken version in TransactionalUndoStorage.
 
@@ -452,7 +476,7 @@
         finally:
             db.close()
 
-    def checkPackGC(self, gc_enabled=True):
+    def checkPackGC(self, expect_object_deleted=True):
         db = DB(self._storage)
         try:
             c1 = db.open()
@@ -473,7 +497,7 @@
                 packtime = time.time()
             self._storage.pack(packtime, referencesf)
 
-            if gc_enabled:
+            if expect_object_deleted:
                 # The object should now be gone
                 self.assertRaises(KeyError, self._storage.load, oid, '')
             else:
@@ -484,8 +508,12 @@
 
     def checkPackGCDisabled(self):
         self._storage._options.pack_gc = False
-        self.checkPackGC(gc_enabled=False)
+        self.checkPackGC(expect_object_deleted=False)
 
+    def checkPackGCDryRun(self):
+        self._storage._options.pack_dry_run = True
+        self.checkPackGC(expect_object_deleted=False)
+
     def checkPackOldUnreferenced(self):
         db = DB(self._storage)
         try:



More information about the Checkins mailing list