[Checkins] SVN: relstorage/branches/1.1/ Merged from trunk (-r95006:95060)

Shane Hathaway shane at hathawaymix.org
Mon Jan 26 21:40:20 EST 2009


Log message for revision 95065:
  Merged from trunk (-r95006:95060)
  

Changed:
  U   relstorage/branches/1.1/CHANGES.txt
  U   relstorage/branches/1.1/relstorage/adapters/common.py
  U   relstorage/branches/1.1/relstorage/adapters/mysql.py
  U   relstorage/branches/1.1/relstorage/adapters/oracle.py
  U   relstorage/branches/1.1/relstorage/adapters/postgresql.py
  U   relstorage/branches/1.1/relstorage/component.xml
  U   relstorage/branches/1.1/relstorage/relstorage.py
  U   relstorage/branches/1.1/relstorage/tests/fakecache.py
  U   relstorage/branches/1.1/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/branches/1.1/CHANGES.txt
===================================================================
--- relstorage/branches/1.1/CHANGES.txt	2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/CHANGES.txt	2009-01-27 02:40:19 UTC (rev 95065)
@@ -1,5 +1,10 @@
 Next Release
 
+- When both cache-servers and poll-interval are set, we now poll the
+  cache for changes on every request.  This makes it possible to use
+  a high poll-interval to reduce the database polling burden, yet
+  every client can see changes immediately.
+
 - Added the pack-dry-run option, which causes pack operations to only
   populate the pack tables with the list of objects and states to pack,
   but not actually pack.

Modified: relstorage/branches/1.1/relstorage/adapters/common.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/common.py	2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/adapters/common.py	2009-01-27 02:40:19 UTC (rev 95065)
@@ -176,7 +176,27 @@
             stmt = '\n'.join(lines)
             self._run_script_stmt(cursor, stmt, params)
 
+    def _open_and_call(self, callback):
+        """Call a function with an open connection and cursor.
 
+        If the function returns, commits the transaction and returns the
+        result returned by the function.
+        If the function raises an exception, aborts the transaction
+        then propagates the exception.
+        """
+        conn, cursor = self.open()
+        try:
+            try:
+                res = callback(conn, cursor)
+            except:
+                conn.rollback()
+                raise
+            else:
+                conn.commit()
+                return res
+        finally:
+            self.close(conn, cursor)
+
     def _transaction_iterator(self, cursor):
         """Iterate over a list of transactions returned from the database.
 
@@ -727,7 +747,7 @@
         pass
 
 
-    def pack(self, pack_tid, options):
+    def pack(self, pack_tid, options, sleep=time.sleep):
         """Pack.  Requires the information provided by pre_pack."""
 
         # Read committed mode is sufficient.
@@ -777,7 +797,7 @@
                             if delay > 0:
                                 log.debug('pack: sleeping %.4g second(s)',
                                     delay)
-                                time.sleep(delay)
+                                sleep(delay)
                         self._hold_commit_lock(cursor)
                         start = time.time()
 

Modified: relstorage/branches/1.1/relstorage/adapters/mysql.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/mysql.py	2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/adapters/mysql.py	2009-01-27 02:40:19 UTC (rev 95065)
@@ -235,66 +235,39 @@
 
     def prepare_schema(self):
         """Create the database schema if it does not already exist."""
-        conn, cursor = self.open()
-        try:
-            try:
-                cursor.execute("SHOW TABLES LIKE 'object_state'")
-                if not cursor.rowcount:
-                    self.create_schema(cursor)
-            except:
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-        finally:
-            self.close(conn, cursor)
+        def callback(conn, cursor):
+            cursor.execute("SHOW TABLES LIKE 'object_state'")
+            if not cursor.rowcount:
+                self.create_schema(cursor)
+        self._open_and_call(callback)
 
-
     def zap_all(self):
         """Clear all data out of the database."""
-        conn, cursor = self.open()
-        try:
-            try:
-                stmt = """
-                DELETE FROM object_refs_added;
-                DELETE FROM object_ref;
-                DELETE FROM current_object;
-                DELETE FROM object_state;
-                TRUNCATE new_oid;
-                DELETE FROM transaction;
-                -- Create a transaction to represent object creation.
-                INSERT INTO transaction (tid, username, description) VALUES
-                    (0, 'system', 'special transaction for object creation');
-                """
-                self._run_script(cursor, stmt)
-            except:
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-        finally:
-            self.close(conn, cursor)
+        def callback(conn, cursor):
+            stmt = """
+            DELETE FROM object_refs_added;
+            DELETE FROM object_ref;
+            DELETE FROM current_object;
+            DELETE FROM object_state;
+            TRUNCATE new_oid;
+            DELETE FROM transaction;
+            -- Create a transaction to represent object creation.
+            INSERT INTO transaction (tid, username, description) VALUES
+                (0, 'system', 'special transaction for object creation');
+            """
+            self._run_script(cursor, stmt)
+        self._open_and_call(callback)
 
-
     def drop_all(self):
         """Drop all tables and sequences."""
-        conn, cursor = self.open()
-        try:
-            try:
-                for tablename in ('pack_state_tid', 'pack_state',
-                        'pack_object', 'object_refs_added', 'object_ref',
-                        'current_object', 'object_state', 'new_oid',
-                        'transaction'):
-                    cursor.execute("DROP TABLE IF EXISTS %s" % tablename)
-            except:
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-        finally:
-            self.close(conn, cursor)
+        def callback(conn, cursor):
+            for tablename in ('pack_state_tid', 'pack_state',
+                    'pack_object', 'object_refs_added', 'object_ref',
+                    'current_object', 'object_state', 'new_oid',
+                    'transaction'):
+                cursor.execute("DROP TABLE IF EXISTS %s" % tablename)
+        self._open_and_call(callback)
 
-
     def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED"):
         """Open a database connection and return (conn, cursor)."""
         try:

Modified: relstorage/branches/1.1/relstorage/adapters/oracle.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/oracle.py	2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/adapters/oracle.py	2009-01-27 02:40:19 UTC (rev 95065)
@@ -15,6 +15,8 @@
 
 import logging
 import re
+import time
+
 import cx_Oracle
 from ZODB.POSException import StorageError
 
@@ -263,77 +265,53 @@
         );
         """
         self._run_script(cursor, stmt)
+        # Let Oracle catch up with the new data definitions by sleeping.
+        # This reduces the likelihood of spurious ORA-01466 errors.
+	time.sleep(5)
 
 
     def prepare_schema(self):
         """Create the database schema if it does not already exist."""
-        conn, cursor = self.open()
-        try:
-            try:
-                cursor.execute("""
-                SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
-                """)
-                if not cursor.fetchall():
-                    self.create_schema(cursor)
-            except:
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-        finally:
-            self.close(conn, cursor)
+        def callback(conn, cursor):
+            cursor.execute("""
+            SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
+            """)
+            if not cursor.fetchall():
+                self.create_schema(cursor)
+        self._open_and_call(callback)
 
-
     def zap_all(self):
         """Clear all data out of the database."""
-        conn, cursor = self.open()
-        try:
-            try:
-                stmt = """
-                DELETE FROM object_refs_added;
-                DELETE FROM object_ref;
-                DELETE FROM current_object;
-                DELETE FROM object_state;
-                DELETE FROM transaction;
-                -- Create a transaction to represent object creation.
-                INSERT INTO transaction (tid, username, description) VALUES
-                    (0, UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
-                    UTL_I18N.STRING_TO_RAW(
-                    'special transaction for object creation', 'US7ASCII'));
-                DROP SEQUENCE zoid_seq;
-                CREATE SEQUENCE zoid_seq;
-                """
-                self._run_script(cursor, stmt)
-            except:
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-        finally:
-            self.close(conn, cursor)
+        def callback(conn, cursor):
+            stmt = """
+            DELETE FROM object_refs_added;
+            DELETE FROM object_ref;
+            DELETE FROM current_object;
+            DELETE FROM object_state;
+            DELETE FROM transaction;
+            -- Create a transaction to represent object creation.
+            INSERT INTO transaction (tid, username, description) VALUES
+                (0, UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
+                UTL_I18N.STRING_TO_RAW(
+                'special transaction for object creation', 'US7ASCII'));
+            DROP SEQUENCE zoid_seq;
+            CREATE SEQUENCE zoid_seq;
+            """
+            self._run_script(cursor, stmt)
+        self._open_and_call(callback)
 
-
     def drop_all(self):
         """Drop all tables and sequences."""
-        conn, cursor = self.open()
-        try:
-            try:
-                for tablename in ('pack_state_tid', 'pack_state',
-                        'pack_object', 'object_refs_added', 'object_ref',
-                        'current_object', 'object_state', 'transaction',
-                        'commit_lock', 'pack_lock',
-                        'temp_store', 'temp_undo', 'temp_pack_visit'):
-                    cursor.execute("DROP TABLE %s" % tablename)
-                cursor.execute("DROP SEQUENCE zoid_seq")
-            except:
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-        finally:
-            self.close(conn, cursor)
+        def callback(conn, cursor):
+            for tablename in ('pack_state_tid', 'pack_state',
+                    'pack_object', 'object_refs_added', 'object_ref',
+                    'current_object', 'object_state', 'transaction',
+                    'commit_lock', 'pack_lock',
+                    'temp_store', 'temp_undo', 'temp_pack_visit'):
+                cursor.execute("DROP TABLE %s" % tablename)
+            cursor.execute("DROP SEQUENCE zoid_seq")
+        self._open_and_call(callback)
 
-
     def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
             twophase=False):
         """Open a database connection and return (conn, cursor)."""

Modified: relstorage/branches/1.1/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/postgresql.py	2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/adapters/postgresql.py	2009-01-27 02:40:19 UTC (rev 95065)
@@ -142,73 +142,46 @@
 
     def prepare_schema(self):
         """Create the database schema if it does not already exist."""
-        conn, cursor = self.open()
-        try:
-            try:
-                cursor.execute("""
-                SELECT tablename
-                FROM pg_tables
-                WHERE tablename = 'object_state'
-                """)
-                if not cursor.rowcount:
-                    self.create_schema(cursor)
-            except:
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-        finally:
-            self.close(conn, cursor)
+        def callback(conn, cursor):
+            cursor.execute("""
+            SELECT tablename
+            FROM pg_tables
+            WHERE tablename = 'object_state'
+            """)
+            if not cursor.rowcount:
+                self.create_schema(cursor)
+        self._open_and_call(callback)
 
-
     def zap_all(self):
         """Clear all data out of the database."""
-        conn, cursor = self.open()
-        try:
-            try:
-                cursor.execute("""
-                DELETE FROM object_refs_added;
-                DELETE FROM object_ref;
-                DELETE FROM current_object;
-                DELETE FROM object_state;
-                DELETE FROM transaction;
-                -- Create a special transaction to represent object creation.
-                INSERT INTO transaction (tid, username, description) VALUES
-                    (0, 'system', 'special transaction for object creation');
-                ALTER SEQUENCE zoid_seq START WITH 1;
-                """)
-            except:
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-        finally:
-            self.close(conn, cursor)
+        def callback(conn, cursor):
+            cursor.execute("""
+            DELETE FROM object_refs_added;
+            DELETE FROM object_ref;
+            DELETE FROM current_object;
+            DELETE FROM object_state;
+            DELETE FROM transaction;
+            -- Create a special transaction to represent object creation.
+            INSERT INTO transaction (tid, username, description) VALUES
+                (0, 'system', 'special transaction for object creation');
+            ALTER SEQUENCE zoid_seq START WITH 1;
+            """)
+        self._open_and_call(callback)
 
-
     def drop_all(self):
         """Drop all tables and sequences."""
-        conn, cursor = self.open()
-        try:
-            try:
-                cursor.execute("SELECT tablename FROM pg_tables")
-                existent = set([name for (name,) in cursor])
-                for tablename in ('pack_state_tid', 'pack_state',
-                        'pack_object', 'object_refs_added', 'object_ref',
-                        'current_object', 'object_state', 'transaction',
-                        'commit_lock', 'pack_lock'):
-                    if tablename in existent:
-                        cursor.execute("DROP TABLE %s" % tablename)
-                cursor.execute("DROP SEQUENCE zoid_seq")
-            except:
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-        finally:
-            self.close(conn, cursor)
+        def callback(conn, cursor):
+            cursor.execute("SELECT tablename FROM pg_tables")
+            existent = set([name for (name,) in cursor])
+            for tablename in ('pack_state_tid', 'pack_state',
+                    'pack_object', 'object_refs_added', 'object_ref',
+                    'current_object', 'object_state', 'transaction',
+                    'commit_lock', 'pack_lock'):
+                if tablename in existent:
+                    cursor.execute("DROP TABLE %s" % tablename)
+            cursor.execute("DROP SEQUENCE zoid_seq")
+        self._open_and_call(callback)
 
-
     def open(self,
             isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
         """Open a database connection and return (conn, cursor)."""
@@ -278,12 +251,10 @@
 
     def get_db_size(self):
         """Returns the approximate size of the database in bytes"""
-        conn, cursor = self.open()
-        try:
+        def callback(conn, cursor):
             cursor.execute("SELECT pg_database_size(current_database())")
             return cursor.fetchone()[0]
-        finally:
-            self.close(conn, cursor)
+        return self._open_and_call(callback)
 
     def get_current_tid(self, cursor, oid):
         """Returns the current integer tid for an object.

Modified: relstorage/branches/1.1/relstorage/component.xml
===================================================================
--- relstorage/branches/1.1/relstorage/component.xml	2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/component.xml	2009-01-27 02:40:19 UTC (rev 95065)
@@ -26,18 +26,26 @@
     </key>
     <key name="poll-interval" datatype="float" required="no">
       <description>
-        Defer polling the database for the specified maximum time interval.
-        Set to 0 (the default) to always poll.  Fractional seconds are
-        allowed.
+        Defer polling the database for the specified maximum time interval,
+        in seconds.  Set to 0 (the default) to always poll.  Fractional
+        seconds are allowed.  Use this to lighten the database load on
+        servers with high read volume and low write volume.
 
-        Use this to lighten the database load on servers with high read
-        volume and low write volume.  A setting of 1-5 seconds is sufficient
-        for most systems.
+        The poll-interval option works best in conjunction with
+        the cache-servers option.  If both are enabled, RelStorage will
+        poll a single cache key for changes on every request.
+        The database will not be polled unless the cache indicates
+        there have been changes, or the timeout specified by poll-interval
+        has expired.  This configuration keeps clients fully up to date,
+        while removing much of the polling burden from the database.
+        A good cluster configuration is to use memcache servers
+        and a high poll-interval (say, 60 seconds).
 
-        While this setting should not affect database integrity,
-        it increases the probability of basing transactions on stale data,
-        leading to conflicts.  Thus a nonzero setting can hurt
-        the performance of servers with high write volume.
+        This option can be used without the cache-servers option,
+        but a large poll-interval without cache-servers increases the
+        probability of basing transactions on stale data, which does not
+        affect database consistency, but does increase the probability
+        of conflict errors, leading to low performance.
       </description>
     </key>
     <key name="pack-gc" datatype="boolean" default="true">

Modified: relstorage/branches/1.1/relstorage/relstorage.py
===================================================================
--- relstorage/branches/1.1/relstorage/relstorage.py	2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/relstorage.py	2009-01-27 02:40:19 UTC (rev 95065)
@@ -664,6 +664,15 @@
         txn = self._prepared_txn
         assert txn is not None
         self._adapter.commit_phase2(self._store_cursor, txn)
+        cache = self._cache_client
+        if cache is not None:
+            if cache.incr('commit_count') is None:
+                # Use the current time as an initial commit_count value.
+                cache.add('commit_count', int(time.time()))
+                # A concurrent committer could have won the race to set the
+                # initial commit_count.  Increment commit_count so that it
+                # doesn't matter who won.
+                cache.incr('commit_count')
         self._prepared_txn = None
         self._ltid = self._tid
         self._tid = None
@@ -820,7 +829,7 @@
             self._lock_release()
 
 
-    def pack(self, t, referencesf):
+    def pack(self, t, referencesf, sleep=time.sleep):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
 
@@ -867,7 +876,7 @@
                     log.info("pack: dry run complete")
                 else:
                     # Now pack.
-                    adapter.pack(tid_int, self._options)
+                    adapter.pack(tid_int, self._options, sleep=sleep)
                     self._after_pack()
             finally:
                 adapter.release_pack_lock(lock_cursor)
@@ -901,6 +910,10 @@
             options=parent._options)
         # _prev_polled_tid contains the tid at the previous poll
         self._prev_polled_tid = None
+        # _commit_count contains the last polled value of the
+        # 'commit_count' cache key
+        self._commit_count = 0
+        # _poll_at is the time to poll regardless of commit_count
         self._poll_at = 0
 
     def _get_oid_cache_key(self, oid_int):
@@ -925,6 +938,30 @@
         finally:
             self._lock_release()
 
+    def need_poll(self):
+        """Return true if polling is needed"""
+        now = time.time()
+
+        cache = self._cache_client
+        if cache is not None:
+            new_commit_count = cache.get('commit_count')
+            if new_commit_count != self._commit_count:
+                # There is new data ready to poll
+                self._commit_count = new_commit_count
+                self._poll_at = now
+                return True
+
+        if not self._load_transaction_open:
+            # Since the load connection is closed or does not have
+            # a transaction in progress, polling is required.
+            return True
+
+        if now >= self._poll_at:
+            # The poll timeout has expired
+            return True
+
+        return False
+
     def poll_invalidations(self):
         """Looks for OIDs of objects that changed since _prev_polled_tid
 
@@ -938,14 +975,10 @@
                 return {}
 
             if self._options.poll_interval:
-                now = time.time()
-                if self._load_transaction_open and now < self._poll_at:
-                    # It's not yet time to poll again.  The previous load
-                    # transaction is still open, so it's safe to
-                    # ignore this poll.
+                if not self.need_poll():
                     return {}
-                # else poll now after resetting the timeout
-                self._poll_at = now + self._options.poll_interval
+                # reset the timeout
+                self._poll_at = time.time() + self._options.poll_interval
 
             self._restart_load()
             conn = self._load_conn

Modified: relstorage/branches/1.1/relstorage/tests/fakecache.py
===================================================================
--- relstorage/branches/1.1/relstorage/tests/fakecache.py	2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/tests/fakecache.py	2009-01-27 02:40:19 UTC (rev 95065)
@@ -14,14 +14,27 @@
 """A memcache-like module sufficient for testing without an actual memcache.
 """
 
+data = {}
+
 class Client(object):
 
     def __init__(self, servers):
         self.servers = servers
-        self.data = {}
 
     def get(self, key):
-        return self.data.get(key)
+        return data.get(key)
 
     def set(self, key, value):
-        self.data[key] = value
+        data[key] = value
+
+    def add(self, key, value):
+        if key not in data:
+            data[key] = value
+
+    def incr(self, key):
+        value = data.get(key)
+        if value is None:
+            return None
+        value = int(value) + 1
+        data[key] = value
+        return value

Modified: relstorage/branches/1.1/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/branches/1.1/relstorage/tests/reltestbase.py	2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/tests/reltestbase.py	2009-01-27 02:40:19 UTC (rev 95065)
@@ -16,6 +16,7 @@
 import itertools
 import time
 from relstorage.relstorage import RelStorage
+from relstorage.tests import fakecache
 
 from ZODB.DB import DB
 from ZODB.utils import p64
@@ -42,13 +43,14 @@
 
     def open(self, **kwargs):
         adapter = self.make_adapter()
-        self._storage = RelStorage(adapter, **kwargs)
+        self._storage = RelStorage(adapter, pack_gc=True, **kwargs)
 
     def setUp(self):
         self.open(create=1)
         self._storage.zap_all()
 
     def tearDown(self):
+        transaction.abort()
         self._storage.close()
         self._storage.cleanup()
 
@@ -231,25 +233,32 @@
     def checkLoadFromCache(self):
         # Store an object, cache it, then retrieve it from the cache
         self._storage._options.cache_servers = 'x:1 y:2'
-        self._storage._options.cache_module_name = 'relstorage.tests.fakecache'
+        self._storage._options.cache_module_name = fakecache.__name__
+        fakecache.data.clear()
 
         db = DB(self._storage)
         try:
             c1 = db.open()
-            cache = c1._storage._cache_client
-            self.assertEqual(cache.servers, ['x:1', 'y:2'])
-            self.assertEqual(len(cache.data), 0)
+            self.assertEqual(c1._storage._cache_client.servers, ['x:1', 'y:2'])
+            self.assertEqual(len(fakecache.data), 0)
             r1 = c1.root()
-            self.assertEqual(len(cache.data), 2)
+            # the root tid and state should now be cached
+            self.assertEqual(len(fakecache.data), 2)
             r1['alpha'] = PersistentMapping()
+            self.assertFalse('commit_count' in fakecache.data)
             transaction.commit()
+            self.assertTrue('commit_count' in fakecache.data)
+            self.assertEqual(len(fakecache.data), 3)
             oid = r1['alpha']._p_oid
 
-            self.assertEqual(len(cache.data), 2)
-            got, serialno = c1._storage.load(oid, '')
-            self.assertEqual(len(cache.data), 4)
-            # load the object from the cache
-            got, serialno = c1._storage.load(oid, '')
+            got, serial = c1._storage.load(oid, '')
+            # another tid and state should now be cached
+            self.assertEqual(len(fakecache.data), 5)
+
+            # load the object via loadSerial()
+            got2 = c1._storage.loadSerial(oid, serial)
+            self.assertEqual(got, got2)
+
             # try to load an object that doesn't exist
             self.assertRaises(KeyError, c1._storage.load, 'bad.oid.', '')
         finally:
@@ -291,7 +300,7 @@
         finally:
             db.close()
 
-    def checkPollInterval(self):
+    def checkPollInterval(self, using_cache=False):
         # Verify the poll_interval parameter causes RelStorage to
         # delay invalidation polling.
         self._storage._options.poll_interval = 3600
@@ -318,9 +327,19 @@
             # flush invalidations to c2, but the poll timer has not
             # yet expired, so the change to r2 should not be seen yet.
             self.assertTrue(c2._storage._poll_at > 0)
-            c2._flush_invalidations()
-            r2 = c2.root()
-            self.assertEqual(r2['alpha'], 1)
+            if using_cache:
+                # The cache reveals that a poll is needed even though
+                # the poll timeout has not expired.
+                self.assertTrue(c2._storage.need_poll())
+                c2._flush_invalidations()
+                r2 = c2.root()
+                self.assertEqual(r2['alpha'], 2)
+                self.assertFalse(c2._storage.need_poll())
+            else:
+                self.assertFalse(c2._storage.need_poll())
+                c2._flush_invalidations()
+                r2 = c2.root()
+                self.assertEqual(r2['alpha'], 1)
 
             # expire the poll timer and verify c2 sees the change
             c2._storage._poll_at -= 3601
@@ -335,7 +354,13 @@
         finally:
             db.close()
 
+    def checkPollIntervalWithCache(self):
+        self._storage._options.cache_servers = 'x:1'
+        self._storage._options.cache_module_name = fakecache.__name__
+        fakecache.data.clear()
+        self.checkPollInterval(using_cache=True)
 
+
     def checkTransactionalUndoIterator(self):
         # this test overrides the broken version in TransactionalUndoStorage.
 
@@ -452,7 +477,7 @@
         finally:
             db.close()
 
-    def checkPackGC(self, gc_enabled=True):
+    def checkPackGC(self, expect_object_deleted=True):
         db = DB(self._storage)
         try:
             c1 = db.open()
@@ -473,7 +498,7 @@
                 packtime = time.time()
             self._storage.pack(packtime, referencesf)
 
-            if gc_enabled:
+            if expect_object_deleted:
                 # The object should now be gone
                 self.assertRaises(KeyError, self._storage.load, oid, '')
             else:
@@ -484,8 +509,12 @@
 
     def checkPackGCDisabled(self):
         self._storage._options.pack_gc = False
-        self.checkPackGC(gc_enabled=False)
+        self.checkPackGC(expect_object_deleted=False)
 
+    def checkPackGCDryRun(self):
+        self._storage._options.pack_dry_run = True
+        self.checkPackGC(expect_object_deleted=False)
+
     def checkPackOldUnreferenced(self):
         db = DB(self._storage)
         try:
@@ -515,6 +544,27 @@
 
         finally:
             db.close()
+
+    def checkPackDutyCycle(self):
+        # Exercise the code in the pack algorithm that releases the
+        # commit lock for a time to allow concurrent transactions to commit.
+        self._storage._options.pack_batch_timeout = 0  # pause after every txn
+
+        slept = []
+        def sim_sleep(seconds):
+            slept.append(seconds)
+
+        db = DB(self._storage)
+        try:
+            # Pack
+            now = packtime = time.time()
+            while packtime <= now:
+                packtime = time.time()
+            self._storage.pack(packtime, referencesf, sleep=sim_sleep)
+
+            self.assertEquals(len(slept), 1)
+        finally:
+            db.close()
         
 
 



More information about the Checkins mailing list