[Checkins] SVN: relstorage/trunk/poll-invalidation-1-zodb-3-9-0a12.patch This is the latest patch for ZODB 3.9
Shane Hathaway
shane at hathawaymix.org
Fri Mar 6 04:38:55 EST 2009
Log message for revision 97555:
This is the latest patch for ZODB 3.9
Changed:
A relstorage/trunk/poll-invalidation-1-zodb-3-9-0a12.patch
-=-
Added: relstorage/trunk/poll-invalidation-1-zodb-3-9-0a12.patch
===================================================================
--- relstorage/trunk/poll-invalidation-1-zodb-3-9-0a12.patch (rev 0)
+++ relstorage/trunk/poll-invalidation-1-zodb-3-9-0a12.patch 2009-03-06 09:38:55 UTC (rev 97555)
@@ -0,0 +1,435 @@
+Index: src/ZODB/PollableMappingStorage.py
+===================================================================
+--- src/ZODB/PollableMappingStorage.py (revision 0)
++++ src/ZODB/PollableMappingStorage.py (revision 97553)
+@@ -0,0 +1,93 @@
++##############################################################################
++#
++# Copyright (c) Zope Corporation and Contributors.
++# All Rights Reserved.
++#
++# This software is subject to the provisions of the Zope Public License,
++# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
++# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
++# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
++# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
++# FOR A PARTICULAR PURPOSE
++#
++##############################################################################
++"""An extension of MappingStorage that depends on polling.
++
++Each Connection has its own view of the database. Polling updates each
++connection's view.
++"""
++
++import time
++
++import BTrees
++from ZODB.interfaces import IStoragePollable
++from ZODB.MappingStorage import MappingStorage
++from ZODB.TimeStamp import TimeStamp
++from zope.interface import implements
++
++
++class PollableMappingStorage(MappingStorage):
++ implements(IStoragePollable)
++
++ propagate_invalidations = False
++
++ def __init__(self, name="Pollable Mapping Storage"):
++ MappingStorage.__init__(self, name=name)
++ # _polled_tid contains the transaction ID at the last poll.
++ self._polled_tid = ''
++
++ def bind_connection(self, connection):
++ """Returns a storage instance to be used by the given Connection.
++ """
++ return BoundStorage(self)
++
++ def connection_closing(self):
++ """Notifies the storage that a connection is closing.
++ """
++ pass
++
++ def poll_invalidations(self):
++ """Poll the storage for changes by other connections.
++ """
++ new_tid = self._transactions.maxKey()
++
++ if self._polled_tid:
++ if not self._transactions.has_key(self._polled_tid):
++ # This connection is so old that we can no longer enumerate
++ # all the changes.
++ self._polled_tid = new_tid
++ return None
++
++ changed_oids = set()
++ for tid, txn in self._transactions.items(
++ self._polled_tid, new_tid, excludemin=True, excludemax=False):
++ if txn.status == 'p':
++ # This transaction has been packed, so it is no longer
++ # possible to enumerate all changed oids.
++ self._polled_tid = new_tid
++ return None
++ if tid == self._ltid:
++ # ignore the transaction committed by this connection
++ continue
++
++ changes = txn.data
++ # pull in changes from the transaction log
++ for oid, value in changes.iteritems():
++ tid_data = self._data.get(oid)
++ if tid_data is None:
++ tid_data = BTrees.OOBTree.OOBucket()
++ self._data[oid] = tid_data
++ tid_data[tid] = changes[oid]
++ changed_oids.update(changes.keys())
++
++ self._polled_tid = new_tid
++ return list(changed_oids)
++
++
++class BoundStorage(PollableMappingStorage):
++ """A PollableMappingStorage used for a specific Connection."""
++
++ def __init__(self, common):
++ PollableMappingStorage.__init__(self, name=common.__name__)
++ # bound storages use the same transaction log as the common storage.
++ self._transactions = common._transactions
+Index: src/ZODB/Connection.py
+===================================================================
+--- src/ZODB/Connection.py (revision 97553)
++++ src/ZODB/Connection.py (working copy)
+@@ -94,8 +94,13 @@
+ # Multi-database support
+ self.connections = {self._db.database_name: self}
+
+- self._normal_storage = self._storage = db.storage
+- self.new_oid = db.storage.new_oid
++ storage = db.storage
++ m = getattr(storage, 'bind_connection', None)
++ if m is not None:
++ # Use a storage instance bound to this connection.
++ storage = m(self)
++ self._normal_storage = self._storage = storage
++ self.new_oid = storage.new_oid
+ self._savepoint_storage = None
+
+ # Do we need to join a txn manager?
+@@ -148,6 +153,12 @@
+ # in the cache on abort and in other connections on finish.
+ self._modified = []
+
++ # Allow the storage to decide whether invalidations should
++ # propagate between connections. If the storage provides MVCC
++ # semantics, it is better to not propagate invalidations between
++ # connections.
++ self._propagate_invalidations = getattr(
++ self._storage, 'propagate_invalidations', True)
+
+ # _invalidated queues invalidate messages delivered from the DB
+ # _inv_lock prevents one thread from modifying the set while
+@@ -295,6 +306,11 @@
+ if self._opened:
+ self.transaction_manager.unregisterSynch(self)
+
++ # If the storage wants to know, tell it this connection is closing.
++ m = getattr(self._storage, 'connection_closing', None)
++ if m is not None:
++ m()
++
+ if primary:
+ for connection in self.connections.values():
+ if connection is not self:
+@@ -323,6 +339,9 @@
+
+ def invalidate(self, tid, oids):
+ """Notify the Connection that transaction 'tid' invalidated oids."""
++ if not self._propagate_invalidations:
++ # The storage disabled inter-connection invalidation.
++ return
+ if self.before is not None:
+ # this is an historical connection. Invalidations are irrelevant.
+ return
+@@ -460,8 +479,23 @@
+ self._registered_objects = []
+ self._creating.clear()
+
++ def _poll_invalidations(self):
++ """Poll and process object invalidations provided by the storage.
++ """
++ m = getattr(self._storage, 'poll_invalidations', None)
++ if m is not None:
++ # Poll the storage for invalidations.
++ invalidated = m()
++ if invalidated is None:
++ # special value: the transaction is so old that
++ # we need to flush the whole cache.
++ self._cache.invalidate(self._cache.cache_data.keys())
++ elif invalidated:
++ self._cache.invalidate(invalidated)
++
+ # Process pending invalidations.
+ def _flush_invalidations(self):
++ self._poll_invalidations()
+ self._inv_lock.acquire()
+ try:
+ # Non-ghostifiable objects may need to read when they are
+Index: src/ZODB/tests/testPollableMappingStorage.py
+===================================================================
+--- src/ZODB/tests/testPollableMappingStorage.py (revision 0)
++++ src/ZODB/tests/testPollableMappingStorage.py (revision 97553)
+@@ -0,0 +1,164 @@
++##############################################################################
++#
++# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
++# All Rights Reserved.
++#
++# This software is subject to the provisions of the Zope Public License,
++# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
++# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
++# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
++# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
++# FOR A PARTICULAR PURPOSE.
++#
++##############################################################################
++
++import unittest
++
++from persistent.mapping import PersistentMapping
++import transaction
++import ZODB.PollableMappingStorage
++from ZODB.DB import DB
++
++
++from ZODB.tests import (
++ BasicStorage,
++ HistoryStorage,
++ IteratorStorage,
++ MTStorage,
++ PackableStorage,
++ RevisionStorage,
++ StorageTestBase,
++ Synchronization,
++ )
++
++class PollableTests:
++
++ def checkCrossConnectionInvalidation(self):
++ # Verify connections see updated state at txn boundaries.
++ # This will fail if Connection doesn't poll for changes.
++ db = DB(self._storage)
++ try:
++ c1 = db.open()
++ r1 = c1.root()
++ r1['myobj'] = 'yes'
++ c2 = db.open()
++ r2 = c2.root()
++ self.assert_('myobj' not in r2)
++
++ storage = c1._storage
++ t = transaction.Transaction()
++ t.description = 'invalidation test'
++ storage.tpc_begin(t)
++ c1.commit(t)
++ storage.tpc_vote(t)
++ storage.tpc_finish(t)
++
++ self.assert_('myobj' not in r2)
++ c2.sync()
++ self.assert_('myobj' in r2)
++ self.assert_(r2['myobj'] == 'yes')
++ finally:
++ db.close()
++
++ def checkCrossConnectionIsolation(self):
++ # Verify MVCC isolates connections.
++ # This will fail if Connection doesn't poll for changes.
++ db = DB(self._storage)
++ try:
++ c1 = db.open()
++ r1 = c1.root()
++ r1['alpha'] = PersistentMapping()
++ r1['gamma'] = PersistentMapping()
++ transaction.commit()
++
++ # Open a second connection but don't load root['alpha'] yet
++ c2 = db.open()
++ r2 = c2.root()
++
++ r1['alpha']['beta'] = 'yes'
++
++ storage = c1._storage
++ t = transaction.Transaction()
++ t.description = 'isolation test 1'
++ storage.tpc_begin(t)
++ c1.commit(t)
++ storage.tpc_vote(t)
++ storage.tpc_finish(t)
++
++ # The second connection will now load root['alpha'], but due to
++ # MVCC, it should continue to see the old state.
++ self.assert_(r2['alpha']._p_changed is None) # A ghost
++ self.assert_(not r2['alpha'])
++ self.assert_(r2['alpha']._p_changed == 0)
++
++ # make root['alpha'] visible to the second connection
++ c2.sync()
++
++ # Now it should be in sync
++ self.assert_(r2['alpha']._p_changed is None) # A ghost
++ self.assert_(r2['alpha'])
++ self.assert_(r2['alpha']._p_changed == 0)
++ self.assert_(r2['alpha']['beta'] == 'yes')
++
++ # Repeat the test with root['gamma']
++ r1['gamma']['delta'] = 'yes'
++
++ storage = c1._storage
++ t = transaction.Transaction()
++ t.description = 'isolation test 2'
++ storage.tpc_begin(t)
++ c1.commit(t)
++ storage.tpc_vote(t)
++ storage.tpc_finish(t)
++
++ # The second connection will now load root[3], but due to MVCC,
++ # it should continue to see the old state.
++ self.assert_(r2['gamma']._p_changed is None) # A ghost
++ self.assert_(not r2['gamma'])
++ self.assert_(r2['gamma']._p_changed == 0)
++
++ # make root[3] visible to the second connection
++ c2.sync()
++
++ # Now it should be in sync
++ self.assert_(r2['gamma']._p_changed is None) # A ghost
++ self.assert_(r2['gamma'])
++ self.assert_(r2['gamma']._p_changed == 0)
++ self.assert_(r2['gamma']['delta'] == 'yes')
++ finally:
++ db.close()
++
++
++class PollableMappingStorageTests(
++ StorageTestBase.StorageTestBase,
++ BasicStorage.BasicStorage,
++
++ HistoryStorage.HistoryStorage,
++ IteratorStorage.ExtendedIteratorStorage,
++ IteratorStorage.IteratorStorage,
++ MTStorage.MTStorage,
++ PackableStorage.PackableStorageWithOptionalGC,
++ RevisionStorage.RevisionStorage,
++ Synchronization.SynchronizedStorage,
++ PollableTests
++ ):
++
++ def setUp(self):
++ self._storage = ZODB.PollableMappingStorage.PollableMappingStorage()
++
++ def tearDown(self):
++ self._storage.close()
++
++ def checkLoadBeforeUndo(self):
++ pass # we don't support undo yet
++ checkUndoZombie = checkLoadBeforeUndo
++
++
++def test_suite():
++ suite = unittest.makeSuite(PollableMappingStorageTests, 'check')
++ return suite
++
++if __name__ == "__main__":
++ loader = unittest.TestLoader()
++ loader.testMethodPrefix = "check"
++ unittest.main(testLoader=loader)
+Index: src/ZODB/MappingStorage.py
+===================================================================
+--- src/ZODB/MappingStorage.py (revision 97553)
++++ src/ZODB/MappingStorage.py (working copy)
+@@ -37,7 +37,7 @@
+ def __init__(self, name='MappingStorage'):
+ self.__name__ = name
+ self._data = {} # {oid->{tid->pickle}}
+- self._transactions = BTrees.OOBTree.OOBTree() # {tid->transaction}
++ self._transactions = BTrees.OOBTree.OOBTree() # {tid->TransactionRecord}
+ self._ltid = None
+ self._last_pack = None
+ _lock = threading.RLock()
+Index: src/ZODB/interfaces.py
+===================================================================
+--- src/ZODB/interfaces.py (revision 97553)
++++ src/ZODB/interfaces.py (working copy)
+@@ -953,6 +953,56 @@
+ # DB pass-through
+
+
++class IStoragePollable(Interface):
++ """A storage that can be polled for changes."""
++
++ def bind_connection(connection):
++ """Returns a storage instance to be used by the given Connection.
++
++ This method is optional. By implementing this method, a storage
++ instance can maintain Connection-specific state.
++
++ If this method is not provided, all connections to the same database
++ use the same storage instance (even across threads).
++ """
++
++ propagate_invalidations = Attribute(
++ """A boolean value indicating whether invalidations should propagate.
++
++ ZODB normally sends invalidation notifications between
++ Connection objects within a Python process. If this
++ attribute is false, no such invalidations will be sent.
++ Cross-connection invalidation should normally be enabled, but
++ it adds unnecessary complexity to storages that expect the connection
++ to poll for invalidations instead.
++
++ If this attribute is not present, it is assumed to be true.
++ """)
++
++ def connection_closing():
++ """Notifies the storage that a connection is closing.
++
++ This method is optional. This method is useful when
++ bind_connection() provides Connection-specific storage instances.
++ It lets the storage release resources.
++ """
++
++ def poll_invalidations():
++ """Poll the storage for external changes.
++
++ This method is optional. This method is useful when
++ bind_connection() provides Connection-specific storage instances.
++
++ Returns either a sequence of OIDs that have changed, or None. When a
++ sequence is returned, the corresponding objects should be removed
++ from the ZODB in-memory cache. When None is returned, the storage is
++ indicating that so much time has elapsed since the last poll that it
++ is no longer possible to enumerate all of the changed OIDs, since the
++ previous transaction seen by the connection has already been packed.
++ In that case, the ZODB in-memory cache should be cleared.
++ """
++
++
+ class IStorageCurrentRecordIteration(IStorage):
+
+ def record_iternext(next=None):
+Index: src/ZODB/DB.py
+===================================================================
+--- src/ZODB/DB.py (revision 97553)
++++ src/ZODB/DB.py (working copy)
+@@ -456,6 +456,10 @@
+ storage.store(z64, None, file.getvalue(), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
++ if hasattr(storage, 'connection_closing'):
++ # Let the storage release whatever resources it used for loading
++ # the root object.
++ storage.connection_closing()
+
+ # Multi-database setup.
+ if databases is None:
More information about the Checkins
mailing list