[Checkins] SVN: relstorage/trunk/ Added the poll_interval option. It reduces the frequency of polls

Shane Hathaway shane at hathawaymix.org
Fri Feb 8 03:35:06 EST 2008


Log message for revision 83662:
  Added the poll_interval option.  It reduces the frequency of polls
  for object invalidations, lightening the database load.  It also
  increases the potential for database conflicts, so use it wisely.
  

Changed:
  U   relstorage/trunk/CHANGELOG.txt
  U   relstorage/trunk/relstorage/component.xml
  U   relstorage/trunk/relstorage/config.py
  U   relstorage/trunk/relstorage/relstorage.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/trunk/CHANGELOG.txt
===================================================================
--- relstorage/trunk/CHANGELOG.txt	2008-02-08 00:46:57 UTC (rev 83661)
+++ relstorage/trunk/CHANGELOG.txt	2008-02-08 08:35:05 UTC (rev 83662)
@@ -34,7 +34,11 @@
 
 - Added support for MySQL.  (Version 5.0 is probably the minimum.)
 
+- Added the poll_interval option.  It reduces the frequency of database
+  polls, but it also increases the potential for conflict errors on
+  servers with high write volume.
 
+
 PGStorage 0.4
 
 - Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut

Modified: relstorage/trunk/relstorage/component.xml
===================================================================
--- relstorage/trunk/relstorage/component.xml	2008-02-08 00:46:57 UTC (rev 83661)
+++ relstorage/trunk/relstorage/component.xml	2008-02-08 08:35:05 UTC (rev 83662)
@@ -24,6 +24,22 @@
         and is still allowed on a read-only filestorage.
       </description>
     </key>
+    <key name="poll-interval" datatype="float" required="no">
+      <description>
+        Defer polling the database for the specified maximum time interval.
+        Set to 0 (the default) to always poll.  Fractional seconds are
+        allowed.
+
+        Use this to lighten the database load on servers with high read
+        volume and low write volume.  A setting of 1-5 seconds is sufficient
+        for most systems.
+
+        While this setting should not affect database integrity,
+        it increases the probability of basing transactions on stale data,
+        leading to conflicts.  Thus a nonzero setting can hurt
+        the performance of servers with high write volume.
+      </description>
+    </key>
   </sectiontype>
 
   <sectiontype name="postgresql" implements="relstorage.adapter"

Modified: relstorage/trunk/relstorage/config.py
===================================================================
--- relstorage/trunk/relstorage/config.py	2008-02-08 00:46:57 UTC (rev 83661)
+++ relstorage/trunk/relstorage/config.py	2008-02-08 08:35:05 UTC (rev 83662)
@@ -24,7 +24,7 @@
         config = self.config
         adapter = config.adapter.open()
         return RelStorage(adapter, name=config.name, create=config.create,
-            read_only=config.read_only)
+            read_only=config.read_only, poll_interval=config.poll_interval)
 
 
 class PostgreSQLAdapterFactory(BaseConfig):

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2008-02-08 00:46:57 UTC (rev 83661)
+++ relstorage/trunk/relstorage/relstorage.py	2008-02-08 08:35:05 UTC (rev 83662)
@@ -43,13 +43,14 @@
     """Storage to a relational database, based on invalidation polling"""
 
     def __init__(self, adapter, name=None, create=True,
-            read_only=False):
+            read_only=False, poll_interval=0):
         if name is None:
             name = 'RelStorage on %s' % adapter.__class__.__name__
 
         self._adapter = adapter
         self._name = name
         self._is_read_only = read_only
+        self._poll_interval = poll_interval
 
         if create:
             self._adapter.prepare_schema()
@@ -57,7 +58,7 @@
         # load_conn and load_cursor are open most of the time.
         self._load_conn = None
         self._load_cursor = None
-        self._load_started = False
+        self._load_transaction_open = False
         self._open_load_connection()
         # store_conn and store_cursor are open during commit,
         # but not necessarily open at other times.
@@ -87,7 +88,7 @@
         conn, cursor = self._adapter.open_for_load()
         self._drop_load_connection()
         self._load_conn, self._load_cursor = conn, cursor
-        self._load_started = True
+        self._load_transaction_open = True
 
     def _drop_load_connection(self):
         conn, cursor = self._load_conn, self._load_cursor
@@ -101,15 +102,15 @@
 
     def _rollback_load_connection(self):
         if self._load_conn is not None:
-            self._load_started = False
             self._load_conn.rollback()
+            self._load_transaction_open = False
 
     def _start_load(self):
         if self._load_cursor is None:
             self._open_load_connection()
         else:
             self._adapter.restart_load(self._load_cursor)
-            self._load_started = True
+            self._load_transaction_open = True
 
     def _zap(self):
         """Clear all objects out of the database.
@@ -157,7 +158,7 @@
     def load(self, oid, version):
         self._lock_acquire()
         try:
-            if not self._load_started:
+            if not self._load_transaction_open:
                 self._start_load()
             cursor = self._load_cursor
             state, tid_int = self._adapter.load_current(cursor, u64(oid))
@@ -188,7 +189,7 @@
                 # for conflict resolution.
                 cursor = self._store_cursor
             else:
-                if not self._load_started:
+                if not self._load_transaction_open:
                     self._start_load()
                 cursor = self._load_cursor
             state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
@@ -213,7 +214,7 @@
                 # for conflict resolution.
                 cursor = self._store_cursor
             else:
-                if not self._load_started:
+                if not self._load_transaction_open:
                     self._start_load()
                 cursor = self._load_cursor
             if not self._adapter.exists(cursor, u64(oid)):
@@ -678,11 +679,29 @@
     def __init__(self, parent, zodb_conn):
         # self._zodb_conn = zodb_conn
         RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
-            read_only=parent._is_read_only, create=False)
+            create=False, read_only=parent._is_read_only,
+            poll_interval=parent._poll_interval)
         # _prev_polled_tid contains the tid at the previous poll
         self._prev_polled_tid = None
         self._showed_disconnect = False
+        self._poll_at = 0
 
+    def connection_closing(self):
+        """Release resources."""
+        if not self._poll_interval:
+            self._rollback_load_connection()
+        # else keep the load transaction open so that it's possible
+        # to ignore the next poll.
+
+    def sync(self):
+        """Process pending invalidations regardless of poll interval"""
+        self._lock_acquire()
+        try:
+            if self._load_transaction_open:
+                self._rollback_load_connection()
+        finally:
+            self._lock_release()
+
     def poll_invalidations(self, retry=True):
         """Looks for OIDs of objects that changed since _prev_polled_tid
 
@@ -694,6 +713,17 @@
         try:
             if self._closed:
                 return {}
+
+            if self._poll_interval:
+                now = time.time()
+                if self._load_transaction_open and now < self._poll_at:
+                    # It's not yet time to poll again.  The previous load
+                    # transaction is still open, so it's safe to
+                    # ignore this poll.
+                    return {}
+                # else poll now after resetting the timeout
+                self._poll_at = now + self._poll_interval
+
             try:
                 self._rollback_load_connection()
                 self._start_load()
@@ -721,6 +751,7 @@
                 return oids
             except POSException.StorageError:
                 # disconnected
+                self._poll_at = 0
                 if not retry:
                     raise
                 if not self._showed_disconnect:

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2008-02-08 00:46:57 UTC (rev 83661)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2008-02-08 08:35:05 UTC (rev 83662)
@@ -219,3 +219,44 @@
             transaction.commit()
         finally:
             db.close()
+
+    def checkPollInterval(self):
+        # Verify the poll_interval parameter causes RelStorage to
+        # delay invalidation polling.
+        self._storage._poll_interval = 3600 
+        db = DB(self._storage)
+        try:
+            c1 = db.open()
+            r1 = c1.root()
+            r1['alpha'] = 1
+            transaction.commit()
+
+            c2 = db.open()
+            r2 = c2.root()
+            self.assertEqual(r2['alpha'], 1)
+
+            r1['alpha'] = 2
+            # commit c1 without triggering c2.afterCompletion().
+            storage = c1._storage
+            t = transaction.Transaction()
+            storage.tpc_begin(t)
+            c1.commit(t)
+            storage.tpc_vote(t)
+            storage.tpc_finish(t)
+
+            # c2 should not see the change yet
+            r2 = c2.root()
+            self.assertEqual(r2['alpha'], 1)
+
+            # expire the poll timer and verify c2 sees the change
+            c2._storage._poll_at -= 3601
+            c2._flush_invalidations()
+            r2 = c2.root()
+            self.assertEqual(r2['alpha'], 2)
+
+            transaction.abort()
+            c2.close()
+            c1.close()
+
+        finally:
+            db.close()



More information about the Checkins mailing list