[Checkins] SVN: relstorage/trunk/relstorage/ Implement IMVCCStorage, but retain compatibility with the invalidation polling patch.
Shane Hathaway
shane at hathawaymix.org
Sat Apr 25 19:06:13 EDT 2009
Log message for revision 99496:
Implement IMVCCStorage, but retain compatibility with the invalidation polling patch.
Changed:
U relstorage/trunk/relstorage/__init__.py
U relstorage/trunk/relstorage/relstorage.py
U relstorage/trunk/relstorage/tests/reltestbase.py
-=-
Modified: relstorage/trunk/relstorage/__init__.py
===================================================================
--- relstorage/trunk/relstorage/__init__.py 2009-04-25 23:02:05 UTC (rev 99495)
+++ relstorage/trunk/relstorage/__init__.py 2009-04-25 23:06:13 UTC (rev 99496)
@@ -14,9 +14,17 @@
"""relstorage package"""
# perform a compatibility test
-from ZODB.Connection import Connection
-
-if not hasattr(Connection, '_poll_invalidations'):
- raise ImportError('RelStorage requires the invalidation polling '
- 'patch for ZODB.')
-del Connection
+try:
+ from ZODB.interfaces import IMVCCStorage
+ del IMVCCStorage
+except ImportError:
+ # see if the polling patch has been applied
+ from ZODB.Connection import Connection
+ if not hasattr(Connection, '_poll_invalidations'):
+ raise ImportError('RelStorage requires the invalidation polling '
+ 'patch for ZODB.')
+ del Connection
+else:
+ # We're running a version of ZODB that knows what to do with
+ # MVCC storages, so no patch is necessary.
+ pass
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2009-04-25 23:02:05 UTC (rev 99495)
+++ relstorage/trunk/relstorage/relstorage.py 2009-04-25 23:06:13 UTC (rev 99496)
@@ -19,7 +19,6 @@
import base64
import cPickle
import logging
-import md5
import os
import time
import weakref
@@ -27,6 +26,8 @@
from ZODB.BaseStorage import BaseStorage, TransactionRecord, DataRecord
from ZODB import ConflictResolution, POSException
from persistent.TimeStamp import TimeStamp
+from zope.interface import Interface
+from zope.interface import implements
try:
from ZODB.interfaces import StorageStopIteration
@@ -36,7 +37,19 @@
backwards-compatible exception.
"""
+try:
+ from ZODB.interfaces import IMVCCStorage
+except ImportError:
+ class IMVCCStorage(Interface):
+ """Stub for versions of ZODB that do not define IMVCCStorage.
+ """
+try:
+ from hashlib import md5
+except ImportError:
+ from md5 import new as md5
+
+
log = logging.getLogger("relstorage")
# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
@@ -50,6 +63,7 @@
class RelStorage(BaseStorage,
ConflictResolution.ConflictResolvingStorage):
"""Storage to a relational database, based on invalidation polling"""
+ implements(IMVCCStorage)
def __init__(self, adapter, name=None, create=True,
read_only=False, options=None, **kwoptions):
@@ -121,7 +135,17 @@
else:
self._cache_client = None
+ # _prev_polled_tid contains the tid at the previous poll
+ self._prev_polled_tid = None
+ # _polled_commit_count contains the last polled value of the
+ # 'commit_count' cache key
+ self._polled_commit_count = 0
+
+ # _poll_at is the time to poll regardless of commit_count
+ self._poll_at = 0
+
+
def _open_load_connection(self):
"""Open the load connection to the database. Return nothing."""
conn, cursor = self._adapter.open_for_load()
@@ -199,7 +223,7 @@
def zap_all(self):
"""Clear all objects and transactions out of the database.
- Used by the test suite and migration scripts.
+ Used by the test suite and the ZODBConvert script.
"""
self._adapter.zap_all()
self._rollback_load_connection()
@@ -207,8 +231,18 @@
if cache is not None:
cache.flush_all()
+ def release(self):
+ """Release back end database sessions used by this storage instance.
+ """
+ self._lock_acquire()
+ try:
+ self._drop_load_connection()
+ self._drop_store_connection()
+ finally:
+ self._lock_release()
+
def close(self):
- """Close the connections to the database."""
+ """Close the storage and all instances."""
self._lock_acquire()
try:
self._closed = True
@@ -221,21 +255,17 @@
finally:
self._lock_release()
- def bind_connection(self, zodb_conn):
- """Get a connection-bound storage instance.
+ def new_instance(self):
+ """Creates and returns another storage instance.
- Connections have their own storage instances so that
- the database can provide the MVCC semantics rather than ZODB.
+ See ZODB.interfaces.IMVCCStorage.
"""
- res = BoundRelStorage(self, zodb_conn)
- self._instances.append(weakref.ref(res))
- return res
+ other = RelStorage(adapter=self._adapter, name=self._name,
+ create=False, read_only=self._is_read_only,
+ options=self._options)
+ self._instances.append(weakref.ref(other))
+ return other
- def connection_closing(self):
- """Release resources."""
- # Note that this is overridden in BoundRelStorage.
- self._rollback_load_connection()
-
def __len__(self):
return self._adapter.get_object_count()
@@ -243,15 +273,6 @@
"""Return database size in bytes"""
return self._adapter.get_db_size()
- def _get_oid_cache_key(self, oid_int):
- """Return the cache key for finding the current tid.
-
- This is overridden by BoundRelStorage. This version always returns
- None because a non-bound storage does not have a prev_polled_tid,
- which is required for cache invalidation.
- """
- return None
-
def _log_keyerror(self, oid_int, reason):
"""Log just before raising KeyError in load().
@@ -286,6 +307,13 @@
msg.append("Recent object tids: %s" % repr(tids))
log.warning('; '.join(msg))
+ def _get_oid_cache_key(self, oid_int):
+ """Return the cache key for finding the current tid."""
+ my_tid = self._prev_polled_tid
+ if my_tid is None:
+ return None
+ return 'tid:%d:%d' % (oid_int, my_tid)
+
def load(self, oid, version):
oid_int = u64(oid)
cache = self._cache_client
@@ -423,7 +451,7 @@
# attempting to store objects after the vote phase has finished.
# That should not happen, should it?
assert self._prepared_txn is None
- md5sum = md5.new(data).hexdigest()
+ md5sum = md5(data).hexdigest()
adapter = self._adapter
cursor = self._store_cursor
@@ -458,7 +486,7 @@
assert self._tid is not None
assert self._prepared_txn is None
if data is not None:
- md5sum = md5.new(data).hexdigest()
+ md5sum = md5(data).hexdigest()
else:
# George Bailey object
md5sum = None
@@ -590,7 +618,7 @@
else:
# resolved
data = rdata
- md5sum = md5.new(data).hexdigest()
+ md5sum = md5(data).hexdigest()
self._adapter.replace_temp(
cursor, oid_int, prev_tid_int, md5sum, data)
resolved.add(oid)
@@ -877,7 +905,6 @@
else:
# Now pack.
adapter.pack(tid_int, self._options, sleep=sleep)
- self._after_pack()
finally:
adapter.release_pack_lock(lock_cursor)
finally:
@@ -885,52 +912,23 @@
adapter.close(lock_conn, lock_cursor)
- def _after_pack(self):
- """Reset the transaction state after packing."""
- # The tests depend on this.
- self._rollback_load_connection()
-
def iterator(self, start=None, stop=None):
return TransactionIterator(self._adapter, start, stop)
+ def sync(self, force=True):
+ """Updates to a current view of the database.
-class BoundRelStorage(RelStorage):
- """Storage to a database, bound to a particular ZODB.Connection."""
+ This is implemented by rolling back the transaction.
- # The propagate_invalidations flag, set to a false value, tells
- # the Connection not to propagate object invalidations across
- # connections, since that ZODB feature is detrimental when the
- # storage provides its own MVCC.
- propagate_invalidations = False
-
- def __init__(self, parent, zodb_conn):
- # self._zodb_conn = zodb_conn
- RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
- create=False, read_only=parent._is_read_only,
- options=parent._options)
- # _prev_polled_tid contains the tid at the previous poll
- self._prev_polled_tid = None
- # _commit_count contains the last polled value of the
- # 'commit_count' cache key
- self._commit_count = 0
- # _poll_at is the time to poll regardless of commit_count
- self._poll_at = 0
-
- def _get_oid_cache_key(self, oid_int):
- my_tid = self._prev_polled_tid
- if my_tid is None:
- return None
- return 'tid:%d:%d' % (oid_int, my_tid)
-
- def connection_closing(self):
- """Release resources."""
- if not self._options.poll_interval:
- self._rollback_load_connection()
- # else keep the load transaction open so that it's possible
- # to ignore the next poll.
-
- def sync(self):
- """Process pending invalidations regardless of poll interval"""
+ If force is False and a poll interval has been set, this call
+ is ignored. The poll_invalidations method will later choose to
+ sync with the database only if enough time has elapsed since
+ the last poll.
+ """
+ if not force and self._options.poll_interval:
+ # keep the load transaction open so that it's possible
+ # to ignore the next poll.
+ return
self._lock_acquire()
try:
if self._load_transaction_open:
@@ -945,9 +943,9 @@
cache = self._cache_client
if cache is not None:
new_commit_count = cache.get('commit_count')
- if new_commit_count != self._commit_count:
+ if new_commit_count != self._polled_commit_count:
# There is new data ready to poll
- self._commit_count = new_commit_count
+ self._polled_commit_count = new_commit_count
self._poll_at = now
return True
@@ -1006,12 +1004,30 @@
finally:
self._lock_release()
- def _after_pack(self):
- # Override transaction reset after packing. If the connection
- # wants to see the new state, it should call sync().
- pass
+ # The propagate_invalidations flag implements the old
+ # invalidation polling API and is not otherwise used. Set to a
+ # false value, it tells the Connection not to propagate object
+ # invalidations across connections, since that ZODB feature is
+ # detrimental when the storage provides its own MVCC.
+ propagate_invalidations = False
+ def bind_connection(self, zodb_conn):
+ """Make a new storage instance.
+ This implements the old invalidation polling API and is not
+ otherwise used.
+ """
+ return self.new_instance()
+
+ def connection_closing(self):
+ """Release resources
+
+ This implements the old invalidation polling API and is not
+ otherwise used.
+ """
+ self.sync(False)
+
+
class TransactionIterator(object):
"""Iterate over the transactions in a RelStorage instance."""
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2009-04-25 23:02:05 UTC (rev 99495)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2009-04-25 23:06:13 UTC (rev 99496)
@@ -235,13 +235,12 @@
# Store an object, cache it, then retrieve it from the cache
self._storage._options.cache_servers = 'x:1 y:2'
self._storage._options.cache_module_name = fakecache.__name__
- fakecache.data.clear()
db = DB(self._storage)
try:
c1 = db.open()
self.assertEqual(c1._storage._cache_client.servers, ['x:1', 'y:2'])
- self.assertEqual(len(fakecache.data), 0)
+ fakecache.data.clear()
r1 = c1.root()
# the root tid and state should now be cached
self.assertEqual(len(fakecache.data), 2)
@@ -498,6 +497,7 @@
while packtime <= now:
packtime = time.time()
self._storage.pack(packtime, referencesf)
+ self._storage.sync()
if expect_object_deleted:
# The object should now be gone
More information about the Checkins
mailing list