[Checkins] SVN: relstorage/trunk/ A nowait locking strategy pack operation.
Martijn Pieters
mj at zopatista.com
Mon Feb 28 09:12:13 EST 2011
Log message for revision 120603:
A nowait locking strategy pack operation.
Commit locks are requested in nowait mode, and packing only pauses when this
fails (e.g. the commit lock is already taken and busy). A short pack batch
cycle ensures that regular commits are given precedence.
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/README.txt
U relstorage/trunk/relstorage/adapters/interfaces.py
U relstorage/trunk/relstorage/adapters/locker.py
U relstorage/trunk/relstorage/adapters/packundo.py
U relstorage/trunk/relstorage/component.xml
U relstorage/trunk/relstorage/options.py
U relstorage/trunk/relstorage/tests/reltestbase.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/CHANGES.txt 2011-02-28 14:12:13 UTC (rev 120603)
@@ -11,6 +11,12 @@
separately, allowing re-use of the pre-pack analysis data or even delegating
the pre-pack phase off to a separate server.
+- Replaced the packing duty cycle with a nowait locking strategy. The pack
+ operation will now request the commit lock but pauses if it is already taken.
+ It releases the lock after every batch (defaulting to 1 second processing).
+ This makes the packing process faster while at the same time yielding to
+ regular ZODB commits when busy.
+
1.5.0b1 (2011-02-05)
--------------------
Modified: relstorage/trunk/README.txt
===================================================================
--- relstorage/trunk/README.txt 2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/README.txt 2011-02-28 14:12:13 UTC (rev 120603)
@@ -478,27 +478,15 @@
timeout in seconds for each batch. Note that some database
configurations have unpredictable I/O performance
and might stall much longer than the timeout.
- The default timeout is 5.0 seconds.
+ The default timeout is 1.0 seconds.
-``pack-duty-cycle``
- After each batch, the pack code pauses for a time to
- allow concurrent transactions to commit. The pack-duty-cycle
- specifies what fraction of time should be spent on packing.
- For example, if the duty cycle is 0.75, then 75% of the time
- will be spent packing: a 6 second pack batch
- will be followed by a 2 second delay. The duty cycle should
- be greater than 0.0 and less than or equal to 1.0. Specify
- 1.0 for no delay between batches.
+``pack-commit-busy-delay``
+ Before each pack batch, the commit lock is requested. If the lock is
+ already held by for a regular commit, packing is paused for a short
+ while. This option specifies how long the pack process should be
+ paused before attempting to get the commit lock again.
+ The default delay is 5.0 seconds.
- The default is 0.5. Raise it to finish packing faster; lower it
- to reduce the effect of packing on transaction commit performance.
-
-``pack-max-delay``
- This specifies a maximum delay between pack batches. Sometimes
- the database takes an extra long time to finish a pack batch; at
- those times it is useful to cap the delay imposed by the
- pack-duty-cycle. The default is 20 seconds.
-
``cache-servers``
Specifies a list of memcached servers. Using memcached with
RelStorage improves the speed of frequent object accesses while
Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py 2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/adapters/interfaces.py 2011-02-28 14:12:13 UTC (rev 120603)
@@ -156,7 +156,7 @@
class ILocker(Interface):
"""Acquire and release the commit and pack locks."""
- def hold_commit_lock(cursor, ensure_current=False):
+ def hold_commit_lock(cursor, ensure_current=False, nowait=False):
"""Acquire the commit lock.
If ensure_current is True, other tables may be locked as well, to
@@ -164,6 +164,10 @@
May raise StorageError if the lock can not be acquired before
some timeout.
+
+ With nowait set to True, only try to obtain the lock without waiting
+ and return a boolean indicating if the lock was successful.
+
"""
def release_commit_lock(cursor):
Modified: relstorage/trunk/relstorage/adapters/locker.py
===================================================================
--- relstorage/trunk/relstorage/adapters/locker.py 2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/adapters/locker.py 2011-02-28 14:12:13 UTC (rev 120603)
@@ -36,26 +36,33 @@
options=options, lock_exceptions=lock_exceptions)
self.version_detector = version_detector
- def hold_commit_lock(self, cursor, ensure_current=False):
- if ensure_current:
- # Hold commit_lock to prevent concurrent commits
- # (for as short a time as possible).
- # Lock transaction and current_object in share mode to ensure
- # conflict detection has the most current data.
- if self.keep_history:
- stmt = """
- LOCK TABLE commit_lock IN EXCLUSIVE MODE;
- LOCK TABLE transaction IN SHARE MODE;
- LOCK TABLE current_object IN SHARE MODE
- """
+ def hold_commit_lock(self, cursor, ensure_current=False, nowait=False):
+ try:
+ if ensure_current:
+ # Hold commit_lock to prevent concurrent commits
+ # (for as short a time as possible).
+ # Lock transaction and current_object in share mode to ensure
+ # conflict detection has the most current data.
+ if self.keep_history:
+ stmt = """
+ LOCK TABLE commit_lock IN EXCLUSIVE MODE%s;
+ LOCK TABLE transaction IN SHARE MODE;
+ LOCK TABLE current_object IN SHARE MODE
+ """ % (nowait and ' NOWAIT' or '',)
+ else:
+ stmt = """
+ LOCK TABLE commit_lock IN EXCLUSIVE MODE%s;
+ LOCK TABLE object_state IN SHARE MODE
+ """ % (nowait and ' NOWAIT' or '',)
+ cursor.execute(stmt)
else:
- stmt = """
- LOCK TABLE commit_lock IN EXCLUSIVE MODE;
- LOCK TABLE object_state IN SHARE MODE
- """
- cursor.execute(stmt)
- else:
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE%s" %
+ (nowait and ' NOWAIT' or '',))
+ except self.lock_exceptions: # psycopg2.DataseError:
+ if nowait:
+ return False
+ raise StorageError('Acquiring a commit lock failed')
+ return True
def release_commit_lock(self, cursor):
# no action needed
@@ -96,10 +103,13 @@
class MySQLLocker(Locker):
implements(ILocker)
- def hold_commit_lock(self, cursor, ensure_current=False):
+ def hold_commit_lock(self, cursor, ensure_current=False, nowait=False):
+ timeout = not nowait and self.commit_lock_timeout or 0
stmt = "SELECT GET_LOCK(CONCAT(DATABASE(), '.commit'), %s)"
- cursor.execute(stmt, (self.commit_lock_timeout,))
+ cursor.execute(stmt, (timeout,))
locked = cursor.fetchone()[0]
+ if nowait and locked in (0, 1):
+ return bool(locked)
if not locked:
raise StorageError("Unable to acquire commit lock")
@@ -132,18 +142,21 @@
options=options, lock_exceptions=lock_exceptions)
self.inputsize_NUMBER = inputsize_NUMBER
- def hold_commit_lock(self, cursor, ensure_current=False):
+ def hold_commit_lock(self, cursor, ensure_current=False, nowait=False):
# Hold commit_lock to prevent concurrent commits
# (for as short a time as possible).
+ timeout = not nowait and self.commit_lock_timeout or 0
status = cursor.callfunc(
"DBMS_LOCK.REQUEST",
self.inputsize_NUMBER, (
self.commit_lock_id,
6, # exclusive (X_MODE)
- self.commit_lock_timeout,
+ timeout,
True,
))
if status != 0:
+ if nowait and status == 1:
+ return False # Lock failed due to a timeout
if status >= 1 and status <= 5:
msg = ('', 'timeout', 'deadlock', 'parameter error',
'lock already owned', 'illegal handle')[int(status)]
@@ -163,6 +176,7 @@
cursor.execute("LOCK TABLE current_object IN SHARE MODE")
else:
cursor.execute("LOCK TABLE object_state IN SHARE MODE")
+ return True
def release_commit_lock(self, cursor):
# no action needed
Modified: relstorage/trunk/relstorage/adapters/packundo.py
===================================================================
--- relstorage/trunk/relstorage/adapters/packundo.py 2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/adapters/packundo.py 2011-02-28 14:12:13 UTC (rev 120603)
@@ -134,22 +134,15 @@
if batch:
upload_batch()
- def _pause_pack(self, sleep, start):
- """Pause packing to allow concurrent commits."""
+ def _pause_pack_until_lock(self, cursor, sleep):
+ """Pause until we can obtain a nowait commit lock."""
if sleep is None:
sleep = time.sleep
- elapsed = time.time() - start
- if elapsed == 0.0:
- # Compensate for low timer resolution by
- # assuming that at least 10 ms elapsed.
- elapsed = 0.01
- duty_cycle = self.options.pack_duty_cycle
- if duty_cycle > 0.0 and duty_cycle < 1.0:
- delay = min(self.options.pack_max_delay,
- elapsed * (1.0 / duty_cycle - 1.0))
- if delay > 0:
- log.debug('pack: sleeping %.4g second(s)', delay)
- sleep(delay)
+ delay = self.options.pack_commit_busy_delay
+ while not self.locker.hold_commit_lock(cursor, nowait=True):
+ cursor.connection.rollback()
+ log.debug('pack: commit lock busy, sleeping %.4g second(s)', delay)
+ sleep(delay)
class HistoryPreservingPackUndo(PackUndo):
@@ -657,15 +650,16 @@
self.runner.run_script(cursor, stmt)
# Hold the commit lock while packing to prevent deadlocks.
- # Pack in small batches of transactions in order to minimize
- # the interruption of concurrent write operations.
+ # Pack in small batches of transactions only after we are able
+ # to obtain a commit lock in order to minimize the
+ # interruption of concurrent write operations.
start = time.time()
packed_list = []
counter, lastreport, statecounter = 0, 0, 0
# We'll report on progress in at most .1% step increments
reportstep = max(total / 1000, 1)
- self.locker.hold_commit_lock(cursor)
+ self._pause_pack_until_lock(cursor, sleep)
for tid, packed, has_removable in tid_rows:
self._pack_transaction(
cursor, pack_tid, tid, packed, has_removable,
@@ -685,8 +679,7 @@
lastreport = counter / reportstep * reportstep
del packed_list[:]
self.locker.release_commit_lock(cursor)
- self._pause_pack(sleep, start)
- self.locker.hold_commit_lock(cursor)
+ self._pause_pack_until_lock(cursor, sleep)
start = time.time()
if packed_func is not None:
for oid, tid in packed_list:
@@ -1109,14 +1102,15 @@
log.info("pack: will remove %d object(s)", total)
# Hold the commit lock while packing to prevent deadlocks.
- # Pack in small batches of transactions in order to minimize
- # the interruption of concurrent write operations.
+ # Pack in small batches of transactions only after we are able
+ # to obtain a commit lock in order to minimize the
+ # interruption of concurrent write operations.
start = time.time()
packed_list = []
# We'll report on progress in at most .1% step increments
lastreport, reportstep = 0, max(total / 1000, 1)
- self.locker.hold_commit_lock(cursor)
+ self._pause_pack_until_lock(cursor, sleep)
while to_remove:
items = to_remove[:100]
del to_remove[:100]
@@ -1139,8 +1133,7 @@
counter, counter/float(total)*100)
lastreport = counter / reportstep * reportstep
self.locker.release_commit_lock(cursor)
- self._pause_pack(sleep, start)
- self.locker.hold_commit_lock(cursor)
+ self._pause_pack_until_lock(cursor, sleep)
start = time.time()
if packed_func is not None:
Modified: relstorage/trunk/relstorage/component.xml
===================================================================
--- relstorage/trunk/relstorage/component.xml 2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/component.xml 2011-02-28 14:12:13 UTC (rev 120603)
@@ -53,12 +53,9 @@
<key name="pack-batch-timeout" datatype="float" required="no">
<description>See the RelStorage README.txt file.</description>
</key>
- <key name="pack-duty-cycle" datatype="float" required="no">
+ <key name="pack-commit-busy-delay" datatype="float" required="no">
<description>See the RelStorage README.txt file.</description>
</key>
- <key name="pack-max-delay" datatype="float" required="no">
- <description>See the RelStorage README.txt file.</description>
- </key>
<key name="cache-servers" datatype="string" required="no">
<description>See the RelStorage README.txt file.</description>
</key>
Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py 2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/options.py 2011-02-28 14:12:13 UTC (rev 120603)
@@ -39,9 +39,8 @@
self.pack_gc = True
self.pack_prepack_only = False
self.pack_skip_prepack = False
- self.pack_batch_timeout = 5.0
- self.pack_duty_cycle = 0.5
- self.pack_max_delay = 20.0
+ self.pack_batch_timeout = 1.0
+ self.pack_commit_busy_delay = 5.0
self.cache_servers = () # ['127.0.0.1:11211']
self.cache_module_name = 'relstorage.pylibmc_wrapper'
self.cache_prefix = ''
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2011-02-28 14:02:48 UTC (rev 120602)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2011-02-28 14:12:13 UTC (rev 120603)
@@ -478,15 +478,19 @@
finally:
db.close()
- def checkPackDutyCycle(self):
- # Exercise the code in the pack algorithm that releases the
- # commit lock for a time to allow concurrent transactions to commit.
- # pause after every txn
+ def checkPackBatchLockNoWait(self):
+ # Exercise the code in the pack algorithm that attempts to get the
+ # commit lock but will sleep if the lock is busy.
self._storage._adapter.packundo.options.pack_batch_timeout = 0
+ adapter = self._storage._adapter
+ test_conn, test_cursor = adapter.connmanager.open()
slept = []
def sim_sleep(seconds):
slept.append(seconds)
+ adapter.locker.release_commit_lock(test_cursor)
+ test_conn.rollback()
+ adapter.connmanager.close(test_conn, test_cursor)
db = DB(self._storage)
try:
@@ -498,10 +502,11 @@
del r['alpha']
transaction.commit()
- # Pack
+ # Pack, with a commit lock held
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
+ adapter.locker.hold_commit_lock(test_cursor)
self._storage.pack(packtime, referencesf, sleep=sim_sleep)
self.assertTrue(len(slept) > 0)
More information about the checkins
mailing list