[Checkins] SVN: relstorage/trunk/ Modified the pack code to pack
one transaction at a time and
Shane Hathaway
shane at hathawaymix.org
Mon Mar 31 23:03:20 EDT 2008
Log message for revision 85051:
Modified the pack code to pack one transaction at a time and
release the commit lock frequently. This should help large pack
operations.
Changed:
U relstorage/trunk/CHANGELOG.txt
A relstorage/trunk/notes/migrate-1.0.1.txt
U relstorage/trunk/relstorage/adapters/common.py
U relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/postgresql.py
A relstorage/trunk/relstorage/tests/packstresstest.py
-=-
Modified: relstorage/trunk/CHANGELOG.txt
===================================================================
--- relstorage/trunk/CHANGELOG.txt 2008-04-01 01:51:37 UTC (rev 85050)
+++ relstorage/trunk/CHANGELOG.txt 2008-04-01 03:03:20 UTC (rev 85051)
@@ -11,7 +11,11 @@
- Additions to the object_ref table are now periodically committed
during pre_pack so that the work is not lost if pre_pack fails.
+- Modified the pack code to pack one transaction at a time and
+ release the commit lock frequently. This should help large pack
+ operations.
+
RelStorage 1.0.1
- The speedtest script failed if run on a test database that has no tables.
Added: relstorage/trunk/notes/migrate-1.0.1.txt
===================================================================
--- relstorage/trunk/notes/migrate-1.0.1.txt (rev 0)
+++ relstorage/trunk/notes/migrate-1.0.1.txt 2008-04-01 03:03:20 UTC (rev 85051)
@@ -0,0 +1,9 @@
+
+Migrating from version 1.0.1
+
+Create a new index on the object_state table. The new pack code will
+probably crawl if this index is missing. This statement works in all 3
+supported databases:
+
+ CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+
Modified: relstorage/trunk/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py 2008-04-01 01:51:37 UTC (rev 85050)
+++ relstorage/trunk/relstorage/adapters/common.py 2008-04-01 03:03:20 UTC (rev 85051)
@@ -16,6 +16,7 @@
from ZODB.POSException import UndoError
import logging
+import time
log = logging.getLogger("relstorage.adapters.common")
@@ -87,6 +88,13 @@
""",
'reset_temp_undo': "DROP TABLE temp_undo",
+
+ 'transaction_has_data': """
+ SELECT tid
+ FROM object_state
+ WHERE tid = %(tid)s
+ LIMIT 1
+ """,
}
@@ -647,23 +655,27 @@
cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
- def pack(self, pack_tid):
+ def _release_commit_lock(self, cursor):
+ """Release the commit lock during packing"""
+ # no action needed
+ pass
+
+
+ def pack(self, pack_tid, max_batch_time=1.0, delay_time=1.0):
"""Pack. Requires populated pack tables."""
# Read committed mode is sufficient.
conn, cursor = self.open()
try:
try:
- log.info("pack: start, pack_tid = %d", pack_tid)
-
stmt = """
- SELECT COUNT(1)
+ SELECT tid
FROM transaction
WHERE tid > 0
AND tid <= %(pack_tid)s
"""
self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
- transaction_count = cursor.fetchone()[0]
+ tids = [tid for (tid,) in cursor]
stmt = """
SELECT COUNT(1)
@@ -684,91 +696,26 @@
log.info(
"pack: will pack %d transaction(s), delete %s object(s),"
" and trim %s old object(s)",
- transaction_count, delete_count, trim_count)
+ len(tids), delete_count, trim_count)
- # hold the commit lock for a moment to prevent deadlocks.
+ # Hold the commit lock while packing to prevent deadlocks.
+ # Pack in small batches of transactions in order to minimize
+ # the interruption of concurrent write operations.
+ expiration = time.time() + max_batch_time
self._hold_commit_lock(cursor)
+ for tid in tids:
+ self._pack_transaction(cursor, pack_tid, tid)
+ if time.time() > expiration:
+ # commit the work done so far and release the
+ # commit lock for a short time
+ conn.commit()
+ self._release_commit_lock(cursor)
+ if delay_time > 0:
+ log.debug('pack: sleeping %d seconds', delay_time)
+ time.sleep(delay_time)
+ self._hold_commit_lock(cursor)
+ expiration = time.time() + max_batch_time
- for table in ('object_ref', 'current_object', 'object_state'):
-
- if delete_count > 0:
- # Remove objects that are in pack_object and have keep
- # set to false.
- log.debug("pack: deleting objects from %s", table)
- stmt = """
- DELETE FROM %s
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = %%(FALSE)s
- )
- """ % table
- self._run_script_stmt(cursor, stmt)
-
- if trim_count > 0 and table != 'current_object':
- # Cut the history of objects in pack_object that
- # have keep set to true.
- log.debug("pack: trimming objects in %s", table)
- stmt = """
- DELETE FROM %s
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = %%(TRUE)s
- )
- AND tid < (
- SELECT keep_tid
- FROM pack_object
- WHERE zoid = %s.zoid
- )
- """ % (table, table)
- self._run_script_stmt(cursor, stmt)
-
- log.debug("pack: terminating prev_tid chains")
- stmt = """
- UPDATE object_state SET prev_tid = 0
- WHERE tid <= %(pack_tid)s
- AND prev_tid != 0
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-
- # For each tid to be removed, delete the corresponding row in
- # object_refs_added.
- log.debug("pack: deleting from object_refs_added")
- stmt = """
- DELETE FROM object_refs_added
- WHERE tid > 0
- AND tid <= %(pack_tid)s
- AND NOT EXISTS (
- SELECT 1
- FROM object_state
- WHERE tid = object_refs_added.tid
- )
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-
- log.debug("pack: deleting transactions")
- stmt = """
- DELETE FROM transaction
- WHERE tid > 0
- AND tid <= %(pack_tid)s
- AND NOT EXISTS (
- SELECT 1
- FROM object_state
- WHERE tid = transaction.tid
- )
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-
- log.debug("pack: marking transactions as packed")
- stmt = """
- UPDATE transaction SET packed = %(TRUE)s
- WHERE tid > 0
- AND tid <= %(pack_tid)s
- AND packed = %(FALSE)s
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-
log.debug("pack: clearing pack_object")
cursor.execute("DELETE FROM pack_object")
@@ -785,6 +732,79 @@
self.close(conn, cursor)
+ def _pack_transaction(self, cursor, pack_tid, tid):
+ """Pack one transaction. Requires populated pack tables."""
+ log.debug("pack: transaction %d: packing", tid)
+ deleted = 0
+ for table in ('object_ref', 'current_object', 'object_state'):
+ # Remove objects that are in pack_object and have keep
+ # set to false.
+ stmt = """
+ DELETE FROM %s
+ WHERE tid = %%(tid)s
+ AND zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %%(FALSE)s
+ )
+ """ % table
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ deleted += cursor.rowcount
+
+ if table != 'current_object':
+ # Cut the history of objects in pack_object that
+ # have keep set to true.
+ stmt = """
+ DELETE FROM %s
+ WHERE tid = %%(tid)s
+ AND zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %%(TRUE)s
+ )
+ AND tid < (
+ SELECT keep_tid
+ FROM pack_object
+ WHERE zoid = %s.zoid
+ )
+ """ % (table, table)
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ deleted += cursor.rowcount
+
+ # Terminate prev_tid chains
+ stmt = """
+ UPDATE object_state SET prev_tid = 0
+ WHERE prev_tid = %(tid)s
+ AND tid <= %(pack_tid)s
+ """
+ self._run_script_stmt(cursor, stmt,
+ {'pack_tid': pack_tid, 'tid': tid})
+
+ # Find out whether the transaction can be removed
+ stmt = self._scripts['transaction_has_data']
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ has_data = list(cursor)
+
+ if has_data:
+ stmt = """
+ UPDATE transaction SET packed = %(TRUE)s
+ WHERE tid = %(tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+ else:
+ stmt = """
+ DELETE FROM object_refs_added
+ WHERE tid = %(tid)s;
+ DELETE FROM transaction
+ WHERE tid = %(tid)s
+ """
+ self._run_script(cursor, stmt, {'tid': tid})
+ deleted += cursor.rowcount
+
+ log.debug("pack: transaction %d: removed %d row(s)", tid, deleted)
+
+
def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
"""Polls for new transactions.
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2008-04-01 01:51:37 UTC (rev 85050)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2008-04-01 03:03:20 UTC (rev 85051)
@@ -104,6 +104,7 @@
CHECK (tid > 0)
) ENGINE = InnoDB;
CREATE INDEX object_state_tid ON object_state (tid);
+ CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
-- Pointers to the current object state
CREATE TABLE current_object (
@@ -572,4 +573,13 @@
raise StorageError("Unable to acquire commit lock")
+ def _release_commit_lock(self, cursor):
+ """Release the commit lock. This is used during packing.
+
+ This overrides the method by the same name in common.Adapter.
+ """
+ stmt = "SELECT RELEASE_LOCK('relstorage.commit')"
+ cursor.execute(stmt)
+
+
_poll_query = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2008-04-01 01:51:37 UTC (rev 85050)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2008-04-01 03:03:20 UTC (rev 85051)
@@ -61,6 +61,12 @@
'create_temp_pack_visit': None,
'create_temp_undo': None,
'reset_temp_undo': "DELETE FROM temp_undo",
+
+ 'transaction_has_data': """
+ SELECT DISTINCT tid
+ FROM object_state
+ WHERE tid = %(tid)s
+ """,
}
def __init__(self, user, password, dsn, twophase=False, arraysize=64):
@@ -133,6 +139,7 @@
state BLOB
);
CREATE INDEX object_state_tid ON object_state (tid);
+ CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
-- Pointers to the current object state
CREATE TABLE current_object (
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2008-04-01 01:51:37 UTC (rev 85050)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2008-04-01 03:03:20 UTC (rev 85051)
@@ -66,6 +66,7 @@
state BYTEA
);
CREATE INDEX object_state_tid ON object_state (tid);
+ CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
-- Pointers to the current object state
CREATE TABLE current_object (
Added: relstorage/trunk/relstorage/tests/packstresstest.py
===================================================================
--- relstorage/trunk/relstorage/tests/packstresstest.py (rev 0)
+++ relstorage/trunk/relstorage/tests/packstresstest.py 2008-04-01 03:03:20 UTC (rev 85051)
@@ -0,0 +1,51 @@
+
+import logging
+
+from ZODB.DB import DB
+from relstorage.adapters.postgresql import PostgreSQLAdapter
+from relstorage.relstorage import RelStorage
+import transaction
+from persistent.mapping import PersistentMapping
+import random
+
+logging.basicConfig()
+logging.getLogger().setLevel(logging.DEBUG)
+
+a = PostgreSQLAdapter(dsn="dbname='packtest'")
+s = RelStorage(a)
+d = DB(s)
+c = d.open()
+
+print 'size:'
+print d.getSize()
+
+if 0:
+ print 'initializing...'
+ container = PersistentMapping()
+ c.root()['container'] = container
+ container_size = 10000
+ for i in range(container_size):
+ container[i] = PersistentMapping()
+ transaction.commit()
+
+ print 'generating transactions...'
+ for trans in range(10000):
+ print trans
+ sources = (random.randint(0, container_size - 1) for j in range(100))
+ for source in sources:
+ obj = container[source]
+ obj[trans] = container[random.randint(0, container_size - 1)]
+ transaction.commit()
+
+ print 'size:'
+ print d.getSize()
+
+ import pdb; pdb.set_trace()
+
+print 'packing...'
+d.pack()
+
+print 'size:'
+print d.getSize()
+
+d.close()
More information about the Checkins
mailing list