[Checkins] SVN: relstorage/trunk/poll-invalidation-1-zodb-3-9-0a12.patch This is the latest patch for ZODB 3.9

Shane Hathaway shane at hathawaymix.org
Fri Mar 6 04:38:55 EST 2009


Log message for revision 97555:
  This is the latest patch for ZODB 3.9
  

Changed:
  A   relstorage/trunk/poll-invalidation-1-zodb-3-9-0a12.patch

-=-
Added: relstorage/trunk/poll-invalidation-1-zodb-3-9-0a12.patch
===================================================================
--- relstorage/trunk/poll-invalidation-1-zodb-3-9-0a12.patch	                        (rev 0)
+++ relstorage/trunk/poll-invalidation-1-zodb-3-9-0a12.patch	2009-03-06 09:38:55 UTC (rev 97555)
@@ -0,0 +1,435 @@
+Index: src/ZODB/PollableMappingStorage.py
+===================================================================
+--- src/ZODB/PollableMappingStorage.py	(revision 0)
++++ src/ZODB/PollableMappingStorage.py	(revision 97553)
+@@ -0,0 +1,93 @@
++##############################################################################
++#
++# Copyright (c) Zope Corporation and Contributors.
++# All Rights Reserved.
++#
++# This software is subject to the provisions of the Zope Public License,
++# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
++# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
++# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
++# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
++# FOR A PARTICULAR PURPOSE
++#
++##############################################################################
++"""An extension of MappingStorage that depends on polling.
++
++Each Connection has its own view of the database.  Polling updates each
++connection's view.
++"""
++
++import time
++
++import BTrees
++from ZODB.interfaces import IStoragePollable
++from ZODB.MappingStorage import MappingStorage
++from ZODB.TimeStamp import TimeStamp
++from zope.interface import implements
++
++
++class PollableMappingStorage(MappingStorage):
++    implements(IStoragePollable)
++
++    propagate_invalidations = False
++
++    def __init__(self, name="Pollable Mapping Storage"):
++        MappingStorage.__init__(self, name=name)
++        # _polled_tid contains the transaction ID at the last poll.
++        self._polled_tid = ''
++
++    def bind_connection(self, connection):
++        """Returns a storage instance to be used by the given Connection.
++        """
++        return BoundStorage(self)
++
++    def connection_closing(self):
++        """Notifies the storage that a connection is closing.
++        """
++        pass
++
++    def poll_invalidations(self):
++        """Poll the storage for changes by other connections.
++        """
++        new_tid = self._transactions.maxKey()
++
++        if self._polled_tid:
++            if not self._transactions.has_key(self._polled_tid):
++                # This connection is so old that we can no longer enumerate
++                # all the changes.
++                self._polled_tid = new_tid
++                return None
++
++        changed_oids = set()
++        for tid, txn in self._transactions.items(
++                self._polled_tid, new_tid, excludemin=True, excludemax=False):
++            if txn.status == 'p':
++                # This transaction has been packed, so it is no longer
++                # possible to enumerate all changed oids.
++                self._polled_tid = new_tid
++                return None
++            if tid == self._ltid:
++                # ignore the transaction committed by this connection
++                continue
++
++            changes = txn.data
++            # pull in changes from the transaction log
++            for oid, value in changes.iteritems():
++                tid_data = self._data.get(oid)
++                if tid_data is None:
++                    tid_data = BTrees.OOBTree.OOBucket()
++                    self._data[oid] = tid_data
++                tid_data[tid] = changes[oid]
++            changed_oids.update(changes.keys())
++
++        self._polled_tid = new_tid
++        return list(changed_oids)
++
++
++class BoundStorage(PollableMappingStorage):
++    """A PollableMappingStorage used for a specific Connection."""
++
++    def __init__(self, common):
++        PollableMappingStorage.__init__(self, name=common.__name__)
++        # bound storages use the same transaction log as the common storage.
++        self._transactions = common._transactions
+Index: src/ZODB/Connection.py
+===================================================================
+--- src/ZODB/Connection.py	(revision 97553)
++++ src/ZODB/Connection.py	(working copy)
+@@ -94,8 +94,13 @@
+         # Multi-database support
+         self.connections = {self._db.database_name: self}
+ 
+-        self._normal_storage = self._storage = db.storage
+-        self.new_oid = db.storage.new_oid
++        storage = db.storage
++        m = getattr(storage, 'bind_connection', None)
++        if m is not None:
++            # Use a storage instance bound to this connection.
++            storage = m(self)
++        self._normal_storage = self._storage = storage
++        self.new_oid = storage.new_oid
+         self._savepoint_storage = None
+ 
+         # Do we need to join a txn manager?
+@@ -148,6 +153,12 @@
+         # in the cache on abort and in other connections on finish.
+         self._modified = []
+ 
++        # Allow the storage to decide whether invalidations should
++        # propagate between connections.  If the storage provides MVCC
++        # semantics, it is better to not propagate invalidations between
++        # connections.
++        self._propagate_invalidations = getattr(
++            self._storage, 'propagate_invalidations', True)
+ 
+         # _invalidated queues invalidate messages delivered from the DB
+         # _inv_lock prevents one thread from modifying the set while
+@@ -295,6 +306,11 @@
+         if self._opened:
+             self.transaction_manager.unregisterSynch(self)
+ 
++        # If the storage wants to know, tell it this connection is closing.
++        m = getattr(self._storage, 'connection_closing', None)
++        if m is not None:
++            m()
++
+         if primary:
+             for connection in self.connections.values():
+                 if connection is not self:
+@@ -323,6 +339,9 @@
+ 
+     def invalidate(self, tid, oids):
+         """Notify the Connection that transaction 'tid' invalidated oids."""
++        if not self._propagate_invalidations:
++            # The storage disabled inter-connection invalidation.
++            return
+         if self.before is not None:
+             # this is an historical connection.  Invalidations are irrelevant.
+             return
+@@ -460,8 +479,23 @@
+         self._registered_objects = []
+         self._creating.clear()
+ 
++    def _poll_invalidations(self):
++        """Poll and process object invalidations provided by the storage.
++        """
++        m = getattr(self._storage, 'poll_invalidations', None)
++        if m is not None:
++            # Poll the storage for invalidations.
++            invalidated = m()
++            if invalidated is None:
++                # special value: the transaction is so old that
++                # we need to flush the whole cache.
++                self._cache.invalidate(self._cache.cache_data.keys())
++            elif invalidated:
++                self._cache.invalidate(invalidated)
++
+     # Process pending invalidations.
+     def _flush_invalidations(self):
++        self._poll_invalidations()
+         self._inv_lock.acquire()
+         try:
+             # Non-ghostifiable objects may need to read when they are
+Index: src/ZODB/tests/testPollableMappingStorage.py
+===================================================================
+--- src/ZODB/tests/testPollableMappingStorage.py	(revision 0)
++++ src/ZODB/tests/testPollableMappingStorage.py	(revision 97553)
+@@ -0,0 +1,164 @@
++##############################################################################
++#
++# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
++# All Rights Reserved.
++#
++# This software is subject to the provisions of the Zope Public License,
++# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
++# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
++# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
++# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
++# FOR A PARTICULAR PURPOSE.
++#
++##############################################################################
++
++import unittest
++
++from persistent.mapping import PersistentMapping
++import transaction
++import ZODB.PollableMappingStorage
++from ZODB.DB import DB
++
++
++from ZODB.tests import (
++    BasicStorage,
++    HistoryStorage,
++    IteratorStorage,
++    MTStorage,
++    PackableStorage,
++    RevisionStorage,
++    StorageTestBase,
++    Synchronization,
++    )
++
++class PollableTests:
++
++    def checkCrossConnectionInvalidation(self):
++        # Verify connections see updated state at txn boundaries.
++        # This will fail if Connection doesn't poll for changes.
++        db = DB(self._storage)
++        try:
++            c1 = db.open()
++            r1 = c1.root()
++            r1['myobj'] = 'yes'
++            c2 = db.open()
++            r2 = c2.root()
++            self.assert_('myobj' not in r2)
++
++            storage = c1._storage
++            t = transaction.Transaction()
++            t.description = 'invalidation test'
++            storage.tpc_begin(t)
++            c1.commit(t)
++            storage.tpc_vote(t)
++            storage.tpc_finish(t)
++
++            self.assert_('myobj' not in r2)
++            c2.sync()
++            self.assert_('myobj' in r2)
++            self.assert_(r2['myobj'] == 'yes')
++        finally:
++            db.close()
++
++    def checkCrossConnectionIsolation(self):
++        # Verify MVCC isolates connections.
++        # This will fail if Connection doesn't poll for changes.
++        db = DB(self._storage)
++        try:
++            c1 = db.open()
++            r1 = c1.root()
++            r1['alpha'] = PersistentMapping()
++            r1['gamma'] = PersistentMapping()
++            transaction.commit()
++
++            # Open a second connection but don't load root['alpha'] yet
++            c2 = db.open()
++            r2 = c2.root()
++
++            r1['alpha']['beta'] = 'yes'
++
++            storage = c1._storage
++            t = transaction.Transaction()
++            t.description = 'isolation test 1'
++            storage.tpc_begin(t)
++            c1.commit(t)
++            storage.tpc_vote(t)
++            storage.tpc_finish(t)
++
++            # The second connection will now load root['alpha'], but due to
++            # MVCC, it should continue to see the old state.
++            self.assert_(r2['alpha']._p_changed is None)  # A ghost
++            self.assert_(not r2['alpha'])
++            self.assert_(r2['alpha']._p_changed == 0)
++
++            # make root['alpha'] visible to the second connection
++            c2.sync()
++
++            # Now it should be in sync
++            self.assert_(r2['alpha']._p_changed is None)  # A ghost
++            self.assert_(r2['alpha'])
++            self.assert_(r2['alpha']._p_changed == 0)
++            self.assert_(r2['alpha']['beta'] == 'yes')
++
++            # Repeat the test with root['gamma']
++            r1['gamma']['delta'] = 'yes'
++
++            storage = c1._storage
++            t = transaction.Transaction()
++            t.description = 'isolation test 2'
++            storage.tpc_begin(t)
++            c1.commit(t)
++            storage.tpc_vote(t)
++            storage.tpc_finish(t)
++
++            # The second connection will now load root[3], but due to MVCC,
++            # it should continue to see the old state.
++            self.assert_(r2['gamma']._p_changed is None)  # A ghost
++            self.assert_(not r2['gamma'])
++            self.assert_(r2['gamma']._p_changed == 0)
++
++            # make root[3] visible to the second connection
++            c2.sync()
++
++            # Now it should be in sync
++            self.assert_(r2['gamma']._p_changed is None)  # A ghost
++            self.assert_(r2['gamma'])
++            self.assert_(r2['gamma']._p_changed == 0)
++            self.assert_(r2['gamma']['delta'] == 'yes')
++        finally:
++            db.close()
++    
++
++class PollableMappingStorageTests(
++    StorageTestBase.StorageTestBase,
++    BasicStorage.BasicStorage,
++
++    HistoryStorage.HistoryStorage,
++    IteratorStorage.ExtendedIteratorStorage,
++    IteratorStorage.IteratorStorage,
++    MTStorage.MTStorage,
++    PackableStorage.PackableStorageWithOptionalGC,
++    RevisionStorage.RevisionStorage,
++    Synchronization.SynchronizedStorage,
++    PollableTests
++    ):
++
++    def setUp(self):
++        self._storage = ZODB.PollableMappingStorage.PollableMappingStorage()
++
++    def tearDown(self):
++        self._storage.close()
++
++    def checkLoadBeforeUndo(self):
++        pass # we don't support undo yet
++    checkUndoZombie = checkLoadBeforeUndo
++
++
++def test_suite():
++    suite = unittest.makeSuite(PollableMappingStorageTests, 'check')
++    return suite
++
++if __name__ == "__main__":
++    loader = unittest.TestLoader()
++    loader.testMethodPrefix = "check"
++    unittest.main(testLoader=loader)
+Index: src/ZODB/MappingStorage.py
+===================================================================
+--- src/ZODB/MappingStorage.py	(revision 97553)
++++ src/ZODB/MappingStorage.py	(working copy)
+@@ -37,7 +37,7 @@
+     def __init__(self, name='MappingStorage'):
+         self.__name__ = name
+         self._data = {}                               # {oid->{tid->pickle}}
+-        self._transactions = BTrees.OOBTree.OOBTree() # {tid->transaction}
++        self._transactions = BTrees.OOBTree.OOBTree() # {tid->TransactionRecord}
+         self._ltid = None
+         self._last_pack = None
+         _lock = threading.RLock()
+Index: src/ZODB/interfaces.py
+===================================================================
+--- src/ZODB/interfaces.py	(revision 97553)
++++ src/ZODB/interfaces.py	(working copy)
+@@ -953,6 +953,56 @@
+         # DB pass-through
+ 
+ 
++class IStoragePollable(Interface):
++    """A storage that can be polled for changes."""
++
++    def bind_connection(connection):
++        """Returns a storage instance to be used by the given Connection.
++
++        This method is optional.  By implementing this method, a storage
++        instance can maintain Connection-specific state.
++
++        If this method is not provided, all connections to the same database
++        use the same storage instance (even across threads).
++        """
++
++    propagate_invalidations = Attribute(
++        """A boolean value indicating whether invalidations should propagate.
++
++        ZODB normally sends invalidation notifications between
++        Connection objects within a Python process.  If this
++        attribute is false, no such invalidations will be sent.
++        Cross-connection invalidation should normally be enabled, but
++        it adds unnecessary complexity to storages that expect the connection
++        to poll for invalidations instead.
++
++        If this attribute is not present, it is assumed to be true.
++        """)
++
++    def connection_closing():
++        """Notifies the storage that a connection is closing.
++
++        This method is optional.  This method is useful when
++        bind_connection() provides Connection-specific storage instances.
++        It lets the storage release resources.
++        """
++
++    def poll_invalidations():
++        """Poll the storage for external changes.
++
++        This method is optional.  This method is useful when
++        bind_connection() provides Connection-specific storage instances.
++
++        Returns either a sequence of OIDs that have changed, or None.  When a
++        sequence is returned, the corresponding objects should be removed
++        from the ZODB in-memory cache.  When None is returned, the storage is
++        indicating that so much time has elapsed since the last poll that it
++        is no longer possible to enumerate all of the changed OIDs, since the
++        previous transaction seen by the connection has already been packed.
++        In that case, the ZODB in-memory cache should be cleared.
++        """
++
++
+ class IStorageCurrentRecordIteration(IStorage):
+ 
+     def record_iternext(next=None):
+Index: src/ZODB/DB.py
+===================================================================
+--- src/ZODB/DB.py	(revision 97553)
++++ src/ZODB/DB.py	(working copy)
+@@ -456,6 +456,10 @@
+             storage.store(z64, None, file.getvalue(), '', t)
+             storage.tpc_vote(t)
+             storage.tpc_finish(t)
++        if hasattr(storage, 'connection_closing'):
++            # Let the storage release whatever resources it used for loading
++            # the root object.
++            storage.connection_closing()
+ 
+         # Multi-database setup.
+         if databases is None:



More information about the Checkins mailing list