[Checkins] SVN: relstorage/trunk/relstorage/ Implement IMVCCStorage, but retain compatibility with the invalidation polling patch.

Shane Hathaway shane at hathawaymix.org
Sat Apr 25 19:06:13 EDT 2009


Log message for revision 99496:
  Implement IMVCCStorage, but retain compatibility with the invalidation polling patch.
  

Changed:
  U   relstorage/trunk/relstorage/__init__.py
  U   relstorage/trunk/relstorage/relstorage.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/trunk/relstorage/__init__.py
===================================================================
--- relstorage/trunk/relstorage/__init__.py	2009-04-25 23:02:05 UTC (rev 99495)
+++ relstorage/trunk/relstorage/__init__.py	2009-04-25 23:06:13 UTC (rev 99496)
@@ -14,9 +14,17 @@
 """relstorage package"""
 
 # perform a compatibility test
-from ZODB.Connection import Connection
-
-if not hasattr(Connection, '_poll_invalidations'):
-    raise ImportError('RelStorage requires the invalidation polling '
-        'patch for ZODB.')
-del Connection
+try:
+    from ZODB.interfaces import IMVCCStorage
+    del IMVCCStorage
+except ImportError:
+    # see if the polling patch has been applied
+    from ZODB.Connection import Connection
+    if not hasattr(Connection, '_poll_invalidations'):
+        raise ImportError('RelStorage requires the invalidation polling '
+            'patch for ZODB.')
+    del Connection
+else:
+    # We're running a version of ZODB that knows what to do with
+    # MVCC storages, so no patch is necessary.
+    pass

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2009-04-25 23:02:05 UTC (rev 99495)
+++ relstorage/trunk/relstorage/relstorage.py	2009-04-25 23:06:13 UTC (rev 99496)
@@ -19,7 +19,6 @@
 import base64
 import cPickle
 import logging
-import md5
 import os
 import time
 import weakref
@@ -27,6 +26,8 @@
 from ZODB.BaseStorage import BaseStorage, TransactionRecord, DataRecord
 from ZODB import ConflictResolution, POSException
 from persistent.TimeStamp import TimeStamp
+from zope.interface import Interface
+from zope.interface import implements
 
 try:
     from ZODB.interfaces import StorageStopIteration
@@ -36,7 +37,19 @@
         backwards-compatible exception.
         """
 
+try:
+    from ZODB.interfaces import IMVCCStorage
+except ImportError:
+    class IMVCCStorage(Interface):
+        """Stub for versions of ZODB that do not define IMVCCStorage.
+        """
 
+try:
+    from hashlib import md5
+except ImportError:
+    from md5 import new as md5
+
+
 log = logging.getLogger("relstorage")
 
 # Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
@@ -50,6 +63,7 @@
 class RelStorage(BaseStorage,
                 ConflictResolution.ConflictResolvingStorage):
     """Storage to a relational database, based on invalidation polling"""
+    implements(IMVCCStorage)
 
     def __init__(self, adapter, name=None, create=True,
             read_only=False, options=None, **kwoptions):
@@ -121,7 +135,17 @@
         else:
             self._cache_client = None
 
+        # _prev_polled_tid contains the tid at the previous poll
+        self._prev_polled_tid = None
 
+        # _polled_commit_count contains the last polled value of the
+        # 'commit_count' cache key
+        self._polled_commit_count = 0
+
+        # _poll_at is the time to poll regardless of commit_count
+        self._poll_at = 0
+
+
     def _open_load_connection(self):
         """Open the load connection to the database.  Return nothing."""
         conn, cursor = self._adapter.open_for_load()
@@ -199,7 +223,7 @@
     def zap_all(self):
         """Clear all objects and transactions out of the database.
 
-        Used by the test suite and migration scripts.
+        Used by the test suite and the ZODBConvert script.
         """
         self._adapter.zap_all()
         self._rollback_load_connection()
@@ -207,8 +231,18 @@
         if cache is not None:
             cache.flush_all()
 
+    def release(self):
+        """Release back end database sessions used by this storage instance.
+        """
+        self._lock_acquire()
+        try:
+            self._drop_load_connection()
+            self._drop_store_connection()
+        finally:
+            self._lock_release()
+
     def close(self):
-        """Close the connections to the database."""
+        """Close the storage and all instances."""
         self._lock_acquire()
         try:
             self._closed = True
@@ -221,21 +255,17 @@
         finally:
             self._lock_release()
 
-    def bind_connection(self, zodb_conn):
-        """Get a connection-bound storage instance.
+    def new_instance(self):
+        """Creates and returns another storage instance.
 
-        Connections have their own storage instances so that
-        the database can provide the MVCC semantics rather than ZODB.
+        See ZODB.interfaces.IMVCCStorage.
         """
-        res = BoundRelStorage(self, zodb_conn)
-        self._instances.append(weakref.ref(res))
-        return res
+        other = RelStorage(adapter=self._adapter, name=self._name,
+            create=False, read_only=self._is_read_only,
+            options=self._options)
+        self._instances.append(weakref.ref(other))
+        return other
 
-    def connection_closing(self):
-        """Release resources."""
-        # Note that this is overridden in BoundRelStorage.
-        self._rollback_load_connection()
-
     def __len__(self):
         return self._adapter.get_object_count()
 
@@ -243,15 +273,6 @@
         """Return database size in bytes"""
         return self._adapter.get_db_size()
 
-    def _get_oid_cache_key(self, oid_int):
-        """Return the cache key for finding the current tid.
-
-        This is overridden by BoundRelStorage.  This version always returns
-        None because a non-bound storage does not have a prev_polled_tid,
-        which is required for cache invalidation.
-        """
-        return None
-
     def _log_keyerror(self, oid_int, reason):
         """Log just before raising KeyError in load().
 
@@ -286,6 +307,13 @@
         msg.append("Recent object tids: %s" % repr(tids))
         log.warning('; '.join(msg))
 
+    def _get_oid_cache_key(self, oid_int):
+        """Return the cache key for finding the current tid."""
+        my_tid = self._prev_polled_tid
+        if my_tid is None:
+            return None
+        return 'tid:%d:%d' % (oid_int, my_tid)
+
     def load(self, oid, version):
         oid_int = u64(oid)
         cache = self._cache_client
@@ -423,7 +451,7 @@
         # attempting to store objects after the vote phase has finished.
         # That should not happen, should it?
         assert self._prepared_txn is None
-        md5sum = md5.new(data).hexdigest()
+        md5sum = md5(data).hexdigest()
 
         adapter = self._adapter
         cursor = self._store_cursor
@@ -458,7 +486,7 @@
         assert self._tid is not None
         assert self._prepared_txn is None
         if data is not None:
-            md5sum = md5.new(data).hexdigest()
+            md5sum = md5(data).hexdigest()
         else:
             # George Bailey object
             md5sum = None
@@ -590,7 +618,7 @@
             else:
                 # resolved
                 data = rdata
-                md5sum = md5.new(data).hexdigest()
+                md5sum = md5(data).hexdigest()
                 self._adapter.replace_temp(
                     cursor, oid_int, prev_tid_int, md5sum, data)
                 resolved.add(oid)
@@ -877,7 +905,6 @@
                 else:
                     # Now pack.
                     adapter.pack(tid_int, self._options, sleep=sleep)
-                    self._after_pack()
             finally:
                 adapter.release_pack_lock(lock_cursor)
         finally:
@@ -885,52 +912,23 @@
             adapter.close(lock_conn, lock_cursor)
 
 
-    def _after_pack(self):
-        """Reset the transaction state after packing."""
-        # The tests depend on this.
-        self._rollback_load_connection()
-
     def iterator(self, start=None, stop=None):
         return TransactionIterator(self._adapter, start, stop)
 
+    def sync(self, force=True):
+        """Updates to a current view of the database.
 
-class BoundRelStorage(RelStorage):
-    """Storage to a database, bound to a particular ZODB.Connection."""
+        This is implemented by rolling back the transaction.
 
-    # The propagate_invalidations flag, set to a false value, tells
-    # the Connection not to propagate object invalidations across
-    # connections, since that ZODB feature is detrimental when the
-    # storage provides its own MVCC.
-    propagate_invalidations = False
-
-    def __init__(self, parent, zodb_conn):
-        # self._zodb_conn = zodb_conn
-        RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
-            create=False, read_only=parent._is_read_only,
-            options=parent._options)
-        # _prev_polled_tid contains the tid at the previous poll
-        self._prev_polled_tid = None
-        # _commit_count contains the last polled value of the
-        # 'commit_count' cache key
-        self._commit_count = 0
-        # _poll_at is the time to poll regardless of commit_count
-        self._poll_at = 0
-
-    def _get_oid_cache_key(self, oid_int):
-        my_tid = self._prev_polled_tid
-        if my_tid is None:
-            return None
-        return 'tid:%d:%d' % (oid_int, my_tid)
-
-    def connection_closing(self):
-        """Release resources."""
-        if not self._options.poll_interval:
-            self._rollback_load_connection()
-        # else keep the load transaction open so that it's possible
-        # to ignore the next poll.
-
-    def sync(self):
-        """Process pending invalidations regardless of poll interval"""
+        If force is False and a poll interval has been set, this call
+        is ignored. The poll_invalidations method will later choose to
+        sync with the database only if enough time has elapsed since
+        the last poll.
+        """
+        if not force and self._options.poll_interval:
+            # keep the load transaction open so that it's possible
+            # to ignore the next poll.
+            return
         self._lock_acquire()
         try:
             if self._load_transaction_open:
@@ -945,9 +943,9 @@
         cache = self._cache_client
         if cache is not None:
             new_commit_count = cache.get('commit_count')
-            if new_commit_count != self._commit_count:
+            if new_commit_count != self._polled_commit_count:
                 # There is new data ready to poll
-                self._commit_count = new_commit_count
+                self._polled_commit_count = new_commit_count
                 self._poll_at = now
                 return True
 
@@ -1006,12 +1004,30 @@
         finally:
             self._lock_release()
 
-    def _after_pack(self):
-        # Override transaction reset after packing.  If the connection
-        # wants to see the new state, it should call sync().
-        pass
+    # The propagate_invalidations flag implements the old
+    # invalidation polling API and is not otherwise used. Set to a
+    # false value, it tells the Connection not to propagate object
+    # invalidations across connections, since that ZODB feature is
+    # detrimental when the storage provides its own MVCC.
+    propagate_invalidations = False
 
+    def bind_connection(self, zodb_conn):
+        """Make a new storage instance.
 
+        This implements the old invalidation polling API and is not
+        otherwise used.
+        """
+        return self.new_instance()
+
+    def connection_closing(self):
+        """Release resources
+
+        This implements the old invalidation polling API and is not
+        otherwise used.
+        """
+        self.sync(False)
+
+
 class TransactionIterator(object):
     """Iterate over the transactions in a RelStorage instance."""
 

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2009-04-25 23:02:05 UTC (rev 99495)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2009-04-25 23:06:13 UTC (rev 99496)
@@ -235,13 +235,12 @@
         # Store an object, cache it, then retrieve it from the cache
         self._storage._options.cache_servers = 'x:1 y:2'
         self._storage._options.cache_module_name = fakecache.__name__
-        fakecache.data.clear()
 
         db = DB(self._storage)
         try:
             c1 = db.open()
             self.assertEqual(c1._storage._cache_client.servers, ['x:1', 'y:2'])
-            self.assertEqual(len(fakecache.data), 0)
+            fakecache.data.clear()
             r1 = c1.root()
             # the root tid and state should now be cached
             self.assertEqual(len(fakecache.data), 2)
@@ -498,6 +497,7 @@
             while packtime <= now:
                 packtime = time.time()
             self._storage.pack(packtime, referencesf)
+            self._storage.sync()
 
             if expect_object_deleted:
                 # The object should now be gone



More information about the Checkins mailing list