[Checkins] SVN: relstorage/trunk/ A nowait locking strategy pack operation.

Martijn Pieters mj at zopatista.com
Mon Feb 28 09:12:13 EST 2011


Log message for revision 120603:
  A nowait locking strategy pack operation.
  
  Commit locks are requested in nowait mode, and packing only pauses when this
  fails (e.g. the commit lock is already taken and busy). A short pack batch
  cycle ensures that regular commits are given precedence.

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/README.txt
  U   relstorage/trunk/relstorage/adapters/interfaces.py
  U   relstorage/trunk/relstorage/adapters/locker.py
  U   relstorage/trunk/relstorage/adapters/packundo.py
  U   relstorage/trunk/relstorage/component.xml
  U   relstorage/trunk/relstorage/options.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/CHANGES.txt	2011-02-28 14:12:13 UTC (rev 120603)
@@ -11,6 +11,12 @@
   separately, allowing re-use of the pre-pack analysis data or even delegating
   the pre-pack phase off to a separate server.
 
+- Replaced the packing duty cycle with a nowait locking strategy. The pack
+  operation will now request the commit lock but pauses if it is already taken.
+  It releases the lock after every batch (defaulting to 1 second processing).
+  This makes the packing process faster while at the same time yielding to
+  regular ZODB commits when busy.
+
 1.5.0b1 (2011-02-05)
 --------------------
 

Modified: relstorage/trunk/README.txt
===================================================================
--- relstorage/trunk/README.txt	2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/README.txt	2011-02-28 14:12:13 UTC (rev 120603)
@@ -478,27 +478,15 @@
         timeout in seconds for each batch.  Note that some database
         configurations have unpredictable I/O performance
         and might stall much longer than the timeout.
-        The default timeout is 5.0 seconds.
+        The default timeout is 1.0 seconds.
 
-``pack-duty-cycle``
-        After each batch, the pack code pauses for a time to
-        allow concurrent transactions to commit.  The pack-duty-cycle
-        specifies what fraction of time should be spent on packing.
-        For example, if the duty cycle is 0.75, then 75% of the time
-        will be spent packing: a 6 second pack batch
-        will be followed by a 2 second delay.  The duty cycle should
-        be greater than 0.0 and less than or equal to 1.0.  Specify
-        1.0 for no delay between batches.
+``pack-commit-busy-delay``
+        Before each pack batch, the commit lock is requested. If the lock is
+        already held by for a regular commit, packing is paused for a short
+        while. This option specifies how long the pack process should be
+        paused before attempting to get the commit lock again.
+        The default delay is 5.0 seconds.
 
-        The default is 0.5.  Raise it to finish packing faster; lower it
-        to reduce the effect of packing on transaction commit performance.
-
-``pack-max-delay``
-        This specifies a maximum delay between pack batches.  Sometimes
-        the database takes an extra long time to finish a pack batch; at
-        those times it is useful to cap the delay imposed by the
-        pack-duty-cycle.  The default is 20 seconds.
-
 ``cache-servers``
         Specifies a list of memcached servers. Using memcached with
         RelStorage improves the speed of frequent object accesses while

Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py	2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/adapters/interfaces.py	2011-02-28 14:12:13 UTC (rev 120603)
@@ -156,7 +156,7 @@
 class ILocker(Interface):
     """Acquire and release the commit and pack locks."""
 
-    def hold_commit_lock(cursor, ensure_current=False):
+    def hold_commit_lock(cursor, ensure_current=False, nowait=False):
         """Acquire the commit lock.
 
         If ensure_current is True, other tables may be locked as well, to
@@ -164,6 +164,10 @@
 
         May raise StorageError if the lock can not be acquired before
         some timeout.
+
+        With nowait set to True, only try to obtain the lock without waiting
+        and return a boolean indicating if the lock was successful.
+
         """
 
     def release_commit_lock(cursor):

Modified: relstorage/trunk/relstorage/adapters/locker.py
===================================================================
--- relstorage/trunk/relstorage/adapters/locker.py	2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/adapters/locker.py	2011-02-28 14:12:13 UTC (rev 120603)
@@ -36,26 +36,33 @@
             options=options, lock_exceptions=lock_exceptions)
         self.version_detector = version_detector
 
-    def hold_commit_lock(self, cursor, ensure_current=False):
-        if ensure_current:
-            # Hold commit_lock to prevent concurrent commits
-            # (for as short a time as possible).
-            # Lock transaction and current_object in share mode to ensure
-            # conflict detection has the most current data.
-            if self.keep_history:
-                stmt = """
-                LOCK TABLE commit_lock IN EXCLUSIVE MODE;
-                LOCK TABLE transaction IN SHARE MODE;
-                LOCK TABLE current_object IN SHARE MODE
-                """
+    def hold_commit_lock(self, cursor, ensure_current=False, nowait=False):
+        try:
+            if ensure_current:
+                # Hold commit_lock to prevent concurrent commits
+                # (for as short a time as possible).
+                # Lock transaction and current_object in share mode to ensure
+                # conflict detection has the most current data.
+                if self.keep_history:
+                    stmt = """
+                    LOCK TABLE commit_lock IN EXCLUSIVE MODE%s;
+                    LOCK TABLE transaction IN SHARE MODE;
+                    LOCK TABLE current_object IN SHARE MODE
+                    """ % (nowait and ' NOWAIT' or '',)
+                else:
+                    stmt = """
+                    LOCK TABLE commit_lock IN EXCLUSIVE MODE%s;
+                    LOCK TABLE object_state IN SHARE MODE
+                    """ % (nowait and ' NOWAIT' or '',)
+                cursor.execute(stmt)
             else:
-                stmt = """
-                LOCK TABLE commit_lock IN EXCLUSIVE MODE;
-                LOCK TABLE object_state IN SHARE MODE
-                """
-            cursor.execute(stmt)
-        else:
-            cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+                cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE%s" %
+                    (nowait and ' NOWAIT' or '',))
+        except self.lock_exceptions:  # psycopg2.DataseError:
+            if nowait:
+                return False
+            raise StorageError('Acquiring a commit lock failed')
+        return True
 
     def release_commit_lock(self, cursor):
         # no action needed
@@ -96,10 +103,13 @@
 class MySQLLocker(Locker):
     implements(ILocker)
 
-    def hold_commit_lock(self, cursor, ensure_current=False):
+    def hold_commit_lock(self, cursor, ensure_current=False, nowait=False):
+        timeout = not nowait and self.commit_lock_timeout or 0
         stmt = "SELECT GET_LOCK(CONCAT(DATABASE(), '.commit'), %s)"
-        cursor.execute(stmt, (self.commit_lock_timeout,))
+        cursor.execute(stmt, (timeout,))
         locked = cursor.fetchone()[0]
+        if nowait and locked in (0, 1):
+            return bool(locked)
         if not locked:
             raise StorageError("Unable to acquire commit lock")
 
@@ -132,18 +142,21 @@
             options=options, lock_exceptions=lock_exceptions)
         self.inputsize_NUMBER = inputsize_NUMBER
 
-    def hold_commit_lock(self, cursor, ensure_current=False):
+    def hold_commit_lock(self, cursor, ensure_current=False, nowait=False):
         # Hold commit_lock to prevent concurrent commits
         # (for as short a time as possible).
+        timeout = not nowait and self.commit_lock_timeout or 0
         status = cursor.callfunc(
             "DBMS_LOCK.REQUEST",
             self.inputsize_NUMBER, (
                 self.commit_lock_id,
                 6,  # exclusive (X_MODE)
-                self.commit_lock_timeout,
+                timeout,
                 True,
             ))
         if status != 0:
+            if nowait and status == 1:
+                return False # Lock failed due to a timeout
             if status >= 1 and status <= 5:
                 msg = ('', 'timeout', 'deadlock', 'parameter error',
                     'lock already owned', 'illegal handle')[int(status)]
@@ -163,6 +176,7 @@
                 cursor.execute("LOCK TABLE current_object IN SHARE MODE")
             else:
                 cursor.execute("LOCK TABLE object_state IN SHARE MODE")
+        return True
 
     def release_commit_lock(self, cursor):
         # no action needed

Modified: relstorage/trunk/relstorage/adapters/packundo.py
===================================================================
--- relstorage/trunk/relstorage/adapters/packundo.py	2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/adapters/packundo.py	2011-02-28 14:12:13 UTC (rev 120603)
@@ -134,22 +134,15 @@
         if batch:
             upload_batch()
 
-    def _pause_pack(self, sleep, start):
-        """Pause packing to allow concurrent commits."""
+    def _pause_pack_until_lock(self, cursor, sleep):
+        """Pause until we can obtain a nowait commit lock."""
         if sleep is None:
             sleep = time.sleep
-        elapsed = time.time() - start
-        if elapsed == 0.0:
-            # Compensate for low timer resolution by
-            # assuming that at least 10 ms elapsed.
-            elapsed = 0.01
-        duty_cycle = self.options.pack_duty_cycle
-        if duty_cycle > 0.0 and duty_cycle < 1.0:
-            delay = min(self.options.pack_max_delay,
-                elapsed * (1.0 / duty_cycle - 1.0))
-            if delay > 0:
-                log.debug('pack: sleeping %.4g second(s)', delay)
-                sleep(delay)
+        delay = self.options.pack_commit_busy_delay
+        while not self.locker.hold_commit_lock(cursor, nowait=True):
+            cursor.connection.rollback()
+            log.debug('pack: commit lock busy, sleeping %.4g second(s)', delay)
+            sleep(delay)
 
 
 class HistoryPreservingPackUndo(PackUndo):
@@ -657,15 +650,16 @@
                     self.runner.run_script(cursor, stmt)
 
                 # Hold the commit lock while packing to prevent deadlocks.
-                # Pack in small batches of transactions in order to minimize
-                # the interruption of concurrent write operations.
+                # Pack in small batches of transactions only after we are able
+                # to obtain a commit lock in order to minimize the
+                # interruption of concurrent write operations.
                 start = time.time()
                 packed_list = []
                 counter, lastreport, statecounter = 0, 0, 0
                 # We'll report on progress in at most .1% step increments
                 reportstep = max(total / 1000, 1)
 
-                self.locker.hold_commit_lock(cursor)
+                self._pause_pack_until_lock(cursor, sleep)
                 for tid, packed, has_removable in tid_rows:
                     self._pack_transaction(
                         cursor, pack_tid, tid, packed, has_removable,
@@ -685,8 +679,7 @@
                             lastreport = counter / reportstep * reportstep
                         del packed_list[:]
                         self.locker.release_commit_lock(cursor)
-                        self._pause_pack(sleep, start)
-                        self.locker.hold_commit_lock(cursor)
+                        self._pause_pack_until_lock(cursor, sleep)
                         start = time.time()
                 if packed_func is not None:
                     for oid, tid in packed_list:
@@ -1109,14 +1102,15 @@
                 log.info("pack: will remove %d object(s)", total)
 
                 # Hold the commit lock while packing to prevent deadlocks.
-                # Pack in small batches of transactions in order to minimize
-                # the interruption of concurrent write operations.
+                # Pack in small batches of transactions only after we are able
+                # to obtain a commit lock in order to minimize the
+                # interruption of concurrent write operations.
                 start = time.time()
                 packed_list = []
                 # We'll report on progress in at most .1% step increments
                 lastreport, reportstep = 0, max(total / 1000, 1)
 
-                self.locker.hold_commit_lock(cursor)
+                self._pause_pack_until_lock(cursor, sleep)
                 while to_remove:
                     items = to_remove[:100]
                     del to_remove[:100]
@@ -1139,8 +1133,7 @@
                                 counter, counter/float(total)*100)
                             lastreport = counter / reportstep * reportstep
                         self.locker.release_commit_lock(cursor)
-                        self._pause_pack(sleep, start)
-                        self.locker.hold_commit_lock(cursor)
+                        self._pause_pack_until_lock(cursor, sleep)
                         start = time.time()
 
                 if packed_func is not None:

Modified: relstorage/trunk/relstorage/component.xml
===================================================================
--- relstorage/trunk/relstorage/component.xml	2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/component.xml	2011-02-28 14:12:13 UTC (rev 120603)
@@ -53,12 +53,9 @@
     <key name="pack-batch-timeout" datatype="float" required="no">
       <description>See the RelStorage README.txt file.</description>
     </key>
-    <key name="pack-duty-cycle" datatype="float" required="no">
+    <key name="pack-commit-busy-delay" datatype="float" required="no">
       <description>See the RelStorage README.txt file.</description>
     </key>
-    <key name="pack-max-delay" datatype="float" required="no">
-      <description>See the RelStorage README.txt file.</description>
-    </key>
     <key name="cache-servers" datatype="string" required="no">
       <description>See the RelStorage README.txt file.</description>
     </key>

Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py	2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/options.py	2011-02-28 14:12:13 UTC (rev 120603)
@@ -39,9 +39,8 @@
         self.pack_gc = True
         self.pack_prepack_only = False
         self.pack_skip_prepack = False
-        self.pack_batch_timeout = 5.0
-        self.pack_duty_cycle = 0.5
-        self.pack_max_delay = 20.0
+        self.pack_batch_timeout = 1.0
+        self.pack_commit_busy_delay = 5.0
         self.cache_servers = ()  # ['127.0.0.1:11211']
         self.cache_module_name = 'relstorage.pylibmc_wrapper'
         self.cache_prefix = ''

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2011-02-28 14:12:13 UTC (rev 120603)
@@ -478,15 +478,19 @@
         finally:
             db.close()
 
-    def checkPackDutyCycle(self):
-        # Exercise the code in the pack algorithm that releases the
-        # commit lock for a time to allow concurrent transactions to commit.
-        # pause after every txn
+    def checkPackBatchLockNoWait(self):
+        # Exercise the code in the pack algorithm that attempts to get the
+        # commit lock but will sleep if the lock is busy.
         self._storage._adapter.packundo.options.pack_batch_timeout = 0
+        adapter = self._storage._adapter
+        test_conn, test_cursor = adapter.connmanager.open()
 
         slept = []
         def sim_sleep(seconds):
             slept.append(seconds)
+            adapter.locker.release_commit_lock(test_cursor)
+            test_conn.rollback()
+            adapter.connmanager.close(test_conn, test_cursor)
 
         db = DB(self._storage)
         try:
@@ -498,10 +502,11 @@
             del r['alpha']
             transaction.commit()
 
-            # Pack
+            # Pack, with a commit lock held
             now = packtime = time.time()
             while packtime <= now:
                 packtime = time.time()
+            adapter.locker.hold_commit_lock(test_cursor)
             self._storage.pack(packtime, referencesf, sleep=sim_sleep)
 
             self.assertTrue(len(slept) > 0)



More information about the checkins mailing list