[Checkins] SVN: relstorage/trunk/ Fixed issues:

Shane Hathaway shane at hathawaymix.org
Sat Jul 31 05:31:34 EDT 2010


Log message for revision 115251:
  Fixed issues:
  
  - Always update the RelStorage cache when opening a database connection for
    loading, even when no ZODB Connection is using the storage.  Otherwise,
    code that used the storage interface directly could cause the cache
    to fall out of sync; the effects would be seen in the next
    ZODB.Connection.
  
  - Added a ZODB monkey patch that passes the "force" parameter to the
    sync method.  This should help the poll-interval option do its job
    better.
  
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/__init__.py
  U   relstorage/trunk/relstorage/adapters/poller.py
  U   relstorage/trunk/relstorage/cache.py
  U   relstorage/trunk/relstorage/options.py
  U   relstorage/trunk/relstorage/storage.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py
  U   relstorage/trunk/setup.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/CHANGES.txt	2010-07-31 09:31:33 UTC (rev 115251)
@@ -1,4 +1,18 @@
 
+1.4.0c3 (2010-07-31)
+--------------------
+
+- Always update the RelStorage cache when opening a database connection for
+  loading, even when no ZODB Connection is using the storage.  Otherwise,
+  code that used the storage interface directly could cause the cache
+  to fall out of sync; the effects would be seen in the next
+  ZODB.Connection.
+
+- Added a ZODB monkey patch that passes the "force" parameter to the
+  sync method.  This should help the poll-interval option do its job
+  better.
+
+
 1.4.0c2 (2010-07-28)
 --------------------
 

Modified: relstorage/trunk/relstorage/__init__.py
===================================================================
--- relstorage/trunk/relstorage/__init__.py	2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/__init__.py	2010-07-31 09:31:33 UTC (rev 115251)
@@ -28,3 +28,34 @@
         pass
 
 check_compatible()
+
+
+def patch_zodb_sync():
+    """Patch Connection.sync() and afterCompletion() to pass the 'force' flag.
+    """
+
+    def _storage_sync(self, *ignored, **kw):
+        sync = getattr(self._storage, 'sync', 0)
+        if sync:
+            # By default, do not force the sync, allowing RelStorage
+            # to ignore sync requests for a while.
+            force = kw.get('force', False)
+            try:
+                sync(force=force)
+            except TypeError:
+                # The 'force' parameter is not accepted.
+                sync()
+        self._flush_invalidations()
+
+    def sync(self):
+        """Manually update the view on the database."""
+        self.transaction_manager.abort()
+        self._storage_sync(force=True)
+
+    from ZODB.Connection import Connection
+    Connection._storage_sync = _storage_sync
+    Connection.afterCompletion = _storage_sync
+    Connection.newTransaction = _storage_sync
+    Connection.sync = sync
+
+patch_zodb_sync()

Modified: relstorage/trunk/relstorage/adapters/poller.py
===================================================================
--- relstorage/trunk/relstorage/adapters/poller.py	2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/adapters/poller.py	2010-07-31 09:31:33 UTC (rev 115251)
@@ -37,13 +37,19 @@
 
         Returns (changes, new_polled_tid), where changes is either
         a list of (oid, tid) that have changed, or None to indicate
-        that the changes are too complex to list.  new_polled_tid is
-        never None.
+        that the changes are too complex to list.  new_polled_tid can be
+        0 if there is no data in the database.
         """
         # find out the tid of the most recent transaction.
         cursor.execute(self.poll_query)
-        new_polled_tid = cursor.fetchone()[0]
-        assert new_polled_tid is not None
+        rows = list(cursor)
+        if not rows:
+            # No data.
+            return None, 0
+        new_polled_tid = rows[0][0]
+        if not new_polled_tid:
+            # No data.
+            return None, 0
 
         if prev_polled_tid is None:
             # This is the first time the connection has polled.

Modified: relstorage/trunk/relstorage/cache.py
===================================================================
--- relstorage/trunk/relstorage/cache.py	2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/cache.py	2010-07-31 09:31:33 UTC (rev 115251)
@@ -102,8 +102,11 @@
 
     def new_instance(self):
         """Return a copy of this instance sharing the same local client"""
-        local_client = self.clients_local_first[0]
-        return StorageCache(self.adapter, self.options, local_client)
+        if self.options.share_local_cache:
+            local_client = self.clients_local_first[0]
+            return StorageCache(self.adapter, self.options, local_client)
+        else:
+            return StorageCache(self.adapter, self.options)
 
     def clear(self):
         """Remove all data from the cache.  Called by speed tests."""

Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py	2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/options.py	2010-07-31 09:31:33 UTC (rev 115251)
@@ -46,6 +46,11 @@
         self.commit_lock_id = 0
         self.strict_tpc = default_strict_tpc
 
+        # If share_local_cache is off, each storage instance has a private
+        # cache rather than a shared cache.  This option exists mainly for
+        # simulating disconnected caches in tests.
+        self.share_local_cache = True
+
         for key, value in kwoptions.iteritems():
             if key in self.__dict__:
                 setattr(self, key, value)

Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py	2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/storage.py	2010-07-31 09:31:33 UTC (rev 115251)
@@ -215,31 +215,21 @@
                 raise
             self._load_transaction_open = False
 
-    def _restart_load(self):
-        """Restart the load connection, creating a new connection if needed"""
-        if self._load_cursor is None:
-            self._open_load_connection()
-            return
-        try:
-            self._adapter.connmanager.restart_load(
-                self._load_conn, self._load_cursor)
-            self._load_transaction_open = True
-        except self._adapter.connmanager.disconnected_exceptions, e:
-            log.warning("Reconnecting load_conn: %s", e)
-            self._drop_load_connection()
-            try:
-                self._open_load_connection()
-            except:
-                log.exception("Reconnect failed.")
-                raise
-            else:
-                log.info("Reconnected.")
+    def _restart_load_and_call(self, f, *args, **kw):
+        """Restart the load connection and call a function.
 
-    def _with_load(self, f, *args, **kw):
-        """Call a function with the load connection and cursor."""
+        The first two function parameters are the load connection and cursor.
+        """
         if self._load_cursor is None:
+            need_restart = False
             self._open_load_connection()
+        else:
+            need_restart = True
         try:
+            if need_restart:
+                self._adapter.connmanager.restart_load(
+                    self._load_conn, self._load_cursor)
+                self._load_transaction_open = True
             return f(self._load_conn, self._load_cursor, *args, **kw)
         except self._adapter.connmanager.disconnected_exceptions, e:
             log.warning("Reconnecting load_conn: %s", e)
@@ -452,7 +442,7 @@
         self._lock_acquire()
         try:
             if not self._load_transaction_open:
-                self._restart_load()
+                self._restart_load_and_poll()
             cursor = self._load_cursor
             state, tid_int = cache.load(cursor, oid_int)
         finally:
@@ -489,7 +479,7 @@
         self._lock_acquire()
         try:
             if not self._load_transaction_open:
-                self._restart_load()
+                self._restart_load_and_poll()
             state = self._adapter.mover.load_revision(
                 self._load_cursor, oid_int, tid_int)
             if state is None and self._store_cursor is not None:
@@ -520,7 +510,7 @@
                 cursor = self._store_cursor
             else:
                 if not self._load_transaction_open:
-                    self._restart_load()
+                    self._restart_load_and_poll()
                 cursor = self._load_cursor
             if not self._adapter.mover.exists(cursor, u64(oid)):
                 raise POSKeyError(oid)
@@ -740,8 +730,7 @@
                 self._adapter.mover.replace_temp(
                     cursor, oid_int, prev_tid_int, data)
                 resolved.add(oid)
-                if cache is not None:
-                    cache.store_temp(oid_int, data)
+                cache.store_temp(oid_int, data)
 
         # Move the new states into the permanent table
         tid_int = u64(self._tid)
@@ -1220,6 +1209,27 @@
 
         return False
 
+    def _restart_load_and_poll(self):
+        """Call _restart_load, poll for changes, and update self._cache.
+        """
+        # Ignore changes made by the last transaction committed
+        # by this connection.
+        if self._ltid is not None:
+            ignore_tid = u64(self._ltid)
+        else:
+            ignore_tid = None
+        prev = self._prev_polled_tid
+
+        # get a list of changed OIDs and the most recent tid
+        changes, new_polled_tid = self._restart_load_and_call(
+            self._adapter.poller.poll_invalidations, prev, ignore_tid)
+
+        # Inform the cache of the changes.
+        self._cache.after_poll(
+            self._load_cursor, prev, new_polled_tid, changes)
+
+        return changes, new_polled_tid
+
     def poll_invalidations(self):
         """Looks for OIDs of objects that changed since _prev_polled_tid
 
@@ -1238,23 +1248,8 @@
                 # reset the timeout
                 self._poll_at = time.time() + self._options.poll_interval
 
-            self._restart_load()
+            changes, new_polled_tid = self._restart_load_and_poll()
 
-            # Ignore changes made by the last transaction committed
-            # by this connection.
-            if self._ltid is not None:
-                ignore_tid = u64(self._ltid)
-            else:
-                ignore_tid = None
-
-            # get a list of changed OIDs and the most recent tid
-            prev = self._prev_polled_tid
-            changes, new_polled_tid = self._with_load(
-                self._adapter.poller.poll_invalidations, prev, ignore_tid)
-
-            self._cache.after_poll(
-                self._load_cursor, prev, new_polled_tid, changes)
-
             self._prev_polled_tid = new_polled_tid
 
             if changes is None:

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2010-07-31 09:31:33 UTC (rev 115251)
@@ -334,65 +334,113 @@
         finally:
             db.close()
 
-    def checkPollInterval(self, using_cache=True):
+    def checkPollInterval(self, shared_cache=True):
         # Verify the poll_interval parameter causes RelStorage to
         # delay invalidation polling.
         self._storage._options.poll_interval = 3600
         db = DB(self._storage)
         try:
-            c1 = db.open()
+            tm1 = transaction.TransactionManager()
+            c1 = db.open(transaction_manager=tm1)
             r1 = c1.root()
             r1['alpha'] = 1
-            transaction.commit()
+            tm1.commit()
 
-            c2 = db.open()
+            tm2 = transaction.TransactionManager()
+            c2 = db.open(transaction_manager=tm2)
             r2 = c2.root()
             self.assertEqual(r2['alpha'], 1)
+            self.assertFalse(c2._storage.need_poll())
+            self.assertTrue(c2._storage._poll_at > 0)
 
             r1['alpha'] = 2
-            # commit c1 without triggering c2.afterCompletion().
-            storage = c1._storage
-            t = transaction.Transaction()
-            storage.tpc_begin(t)
-            c1.commit(t)
-            storage.tpc_vote(t)
-            storage.tpc_finish(t)
+            # commit c1 without committing c2.
+            tm1.commit()
 
-            # flush invalidations to c2, but the poll timer has not
-            # yet expired, so the change to r2 should not be seen yet.
-            self.assertTrue(c2._storage._poll_at > 0)
-            if using_cache:
+            if shared_cache:
                 # The cache reveals that a poll is needed even though
                 # the poll timeout has not expired.
                 self.assertTrue(c2._storage.need_poll())
-                c2._flush_invalidations()
+                tm2.commit()
                 r2 = c2.root()
                 self.assertEqual(r2['alpha'], 2)
                 self.assertFalse(c2._storage.need_poll())
             else:
-                # Now confirm that no poll is needed
+                # The poll timeout has not expired, so no poll should occur
+                # yet, even after a commit.
                 self.assertFalse(c2._storage.need_poll())
-                c2._flush_invalidations()
+                tm2.commit()
                 r2 = c2.root()
                 self.assertEqual(r2['alpha'], 1)
 
             # expire the poll timer and verify c2 sees the change
             c2._storage._poll_at -= 3601
-            c2._flush_invalidations()
+            tm2.commit()
             r2 = c2.root()
             self.assertEqual(r2['alpha'], 2)
 
-            transaction.abort()
             c2.close()
             c1.close()
 
         finally:
             db.close()
 
-    def checkPollIntervalWithoutCache(self):
-        self._storage._options.cache_local_mb = 0
-        self.checkPollInterval(using_cache=True)
+    def checkPollIntervalWithUnsharedCache(self):
+        self._storage._options.share_local_cache = False
+        self.checkPollInterval(shared_cache=False)
 
+    def checkCachePolling(self):
+        self._storage._options.poll_interval = 3600
+        self._storage._options.share_local_cache = False
+        db = DB(self._storage)
+        try:
+            # Set up the database.
+            tm1 = transaction.TransactionManager()
+            c1 = db.open(transaction_manager=tm1)
+            r1 = c1.root()
+            r1['obj'] = obj1 = PersistentMapping({'change': 0})
+            tm1.commit()
+
+            # Load and change the object in an independent connection.
+            tm2 = transaction.TransactionManager()
+            c2 = db.open(transaction_manager=tm2)
+            r2 = c2.root()
+            r2['obj']['change'] = 1
+            tm2.commit()
+            # Now c2 has delta_after0.
+            self.assertEqual(len(c2._storage._cache.delta_after0), 1)
+            c2.close()
+
+            # Change the object in the original connection.
+            c1.sync()
+            obj1['change'] = 2
+            tm1.commit()
+
+            # Close the database connection to c2.
+            c2._storage._drop_load_connection()
+
+            # Make the database connection to c2 reopen without polling.
+            c2._storage.load('\0' * 8, '')
+            self.assertTrue(c2._storage._load_transaction_open)
+
+            # Open a connection, which should be the same connection
+            # as c2.
+            c3 = db.open(transaction_manager=tm2)
+            self.assertTrue(c3 is c2)
+            self.assertEqual(len(c2._storage._cache.delta_after0), 1)
+
+            # Clear the caches (but not delta_after*)
+            c3._resetCache()
+            for client in c3._storage._cache.clients_local_first:
+                client.flush_all()
+
+            obj3 = c3.root()['obj']
+            # Should have loaded the new object.
+            self.assertEqual(obj3['change'], 2)
+
+        finally:
+            db.close()
+
     def checkDoubleCommitter(self):
         # Verify we can store an object that gets committed twice in
         # a single transaction.

Modified: relstorage/trunk/setup.py
===================================================================
--- relstorage/trunk/setup.py	2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/setup.py	2010-07-31 09:31:33 UTC (rev 115251)
@@ -13,7 +13,7 @@
 ##############################################################################
 """A backend for ZODB that stores pickles in a relational database."""
 
-VERSION = "1.4.0c2"
+VERSION = "1.4.0c3"
 
 # The choices for the Trove Development Status line:
 # Development Status :: 5 - Production/Stable



More information about the checkins mailing list