[Checkins] SVN: relstorage/branches/ Added a 1.1 branch for pack
optimizations
Shane Hathaway
shane at hathawaymix.org
Tue Apr 8 03:58:59 EDT 2008
Log message for revision 85160:
Added a 1.1 branch for pack optimizations
Changed:
A relstorage/branches/
A relstorage/branches/1.1/
U relstorage/branches/1.1/notes/migrate-1.0.1.txt
U relstorage/branches/1.1/relstorage/adapters/common.py
U relstorage/branches/1.1/relstorage/adapters/postgresql.py
U relstorage/branches/1.1/relstorage/tests/packstresstest.py
-=-
Copied: relstorage/branches/1.1 (from rev 85056, relstorage/trunk)
Modified: relstorage/branches/1.1/notes/migrate-1.0.1.txt
===================================================================
--- relstorage/trunk/notes/migrate-1.0.1.txt 2008-04-01 07:00:46 UTC (rev 85056)
+++ relstorage/branches/1.1/notes/migrate-1.0.1.txt 2008-04-08 07:58:59 UTC (rev 85160)
@@ -1,17 +1,32 @@
Migrating from version 1.0.1
-Create a new index on the object_state table. The new iterative pack code
-will operate slowly without this index. The following statement works in
-all 3 supported databases:
+PostgreSQL:
CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
-Also, for PostgreSQL only, apply this pack optimization:
-
DROP INDEX pack_object_keep_zoid;
CREATE INDEX pack_object_keep_false ON pack_object (zoid)
WHERE keep = false;
CREATE INDEX pack_object_keep_true ON pack_object (zoid, keep_tid)
WHERE keep = true;
+ ALTER TABLE transaction ADD COLUMN empty BOOLEAN NOT NULL DEFAULT FALSE;
+
+ CREATE INDEX current_object_tid ON current_object (tid);
+
+ ALTER TABLE object_ref PRIMARY KEY (tid, zoid, to_zoid);
+ DROP INDEX object_ref_from;
+ DROP INDEX object_ref_tid;
+ DROP INDEX object_ref_to;
+
+ CREATE TABLE pack_state (
+ tid BIGINT NOT NULL,
+ zoid BIGINT NOT NULL,
+ PRIMARY KEY (tid, zoid)
+ );
+
+ CREATE TABLE pack_state_tid (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
+
Modified: relstorage/branches/1.1/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py 2008-04-01 07:00:46 UTC (rev 85056)
+++ relstorage/branches/1.1/relstorage/adapters/common.py 2008-04-08 07:58:59 UTC (rev 85160)
@@ -42,6 +42,7 @@
'TRUE': 'TRUE',
'FALSE': 'FALSE',
'OCTET_LENGTH': 'OCTET_LENGTH',
+ 'TRUNCATE': 'TRUNCATE',
'oid': '%(oid)s',
'tid': '%(tid)s',
'pack_tid': '%(pack_tid)s',
@@ -403,6 +404,53 @@
log.info("pre_pack: start without gc")
self._pre_pack_without_gc(
conn, cursor, pack_tid)
+ conn.commit()
+
+ log.info("pre_pack: enumerating states to pack")
+ stmt = "%(TRUNCATE)s pack_state"
+ self._run_script_stmt(cursor, stmt)
+ to_remove = 0
+
+ if gc:
+ stmt = """
+ INSERT INTO pack_state (tid, zoid)
+ SELECT tid, zoid
+ FROM object_state
+ JOIN pack_object USING (zoid)
+ WHERE keep = %(FALSE)s
+ AND tid > 0
+ AND tid <= %(pack_tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'pack_tid':
+ pack_tid})
+ to_remove += cursor.rowcount
+
+ stmt = """
+ INSERT INTO pack_state (tid, zoid)
+ SELECT tid, zoid
+ FROM object_state
+ JOIN pack_object USING (zoid)
+ WHERE keep = %(TRUE)s
+ AND tid > 0
+ AND tid != keep_tid
+ AND tid <= %(pack_tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'pack_tid':pack_tid})
+ to_remove += cursor.rowcount
+
+ log.info("pre_pack: enumerating transactions to pack")
+ stmt = "%(TRUNCATE)s pack_state_tid"
+ self._run_script_stmt(cursor, stmt)
+ stmt = """
+ INSERT INTO pack_state_tid (tid)
+ SELECT DISTINCT tid
+ FROM pack_state
+ """
+ cursor.execute(stmt)
+
+ log.info("pre_pack: will remove %d object state(s)",
+ to_remove)
+
except:
log.exception("pre_pack: failed")
conn.rollback()
@@ -425,7 +473,7 @@
log.debug("pre_pack: populating pack_object")
subselect = self._scripts['select_keep_tid']
stmt = """
- DELETE FROM pack_object;
+ %(TRUNCATE)s pack_object;
INSERT INTO pack_object (zoid, keep)
SELECT DISTINCT zoid, %(TRUE)s
@@ -450,7 +498,7 @@
# removed (if nothing references the OID) or whose history will
# be cut.
stmt = """
- DELETE FROM pack_object;
+ %(TRUNCATE)s pack_object;
INSERT INTO pack_object (zoid, keep)
SELECT DISTINCT zoid, %(FALSE)s
@@ -503,7 +551,7 @@
# references.
subselect = self._scripts['select_keep_tid']
stmt = """
- DELETE FROM temp_pack_visit;
+ %(TRUNCATE)s temp_pack_visit;
INSERT INTO temp_pack_visit (zoid)
SELECT zoid
@@ -537,7 +585,7 @@
self._run_script_stmt(cursor, stmt)
found_count = cursor.rowcount
- log.info("pre_pack: found %d more referenced object(s) in "
+ log.debug("pre_pack: found %d more referenced object(s) in "
"pass %d", found_count, pass_num)
if not found_count:
# No new references detected.
@@ -640,7 +688,7 @@
"""Fill object_refs with all states for multiple transactions."""
if tids:
added = 0
- log.info("pre_pack: examining all references from objects in %d "
+ log.info("pre_pack: discovering references from objects in %d "
"transaction(s)" % len(tids))
for tid in tids:
added += self._add_refs_for_tid(cursor, tid, get_references)
@@ -661,7 +709,8 @@
pass
- def pack(self, pack_tid, batch_timeout=2.0, min_delay=2.0, max_delay=15.0):
+ def pack(self, pack_tid, batch_timeout=5.0, min_delay=5.0,
+ max_delay=20.0):
"""Pack. Requires populated pack tables."""
# Read committed mode is sufficient.
@@ -669,45 +718,30 @@
try:
try:
stmt = """
- SELECT tid
+ SELECT transaction.tid,
+ CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END,
+ CASE WHEN pack_state_tid.tid IS NOT NULL THEN 1 ELSE 0 END
FROM transaction
- WHERE tid > 0
- AND tid <= %(pack_tid)s
+ LEFT JOIN pack_state_tid USING (tid)
+ WHERE transaction.tid > 0
+ AND transaction.tid <= %(pack_tid)s
+ AND (packed = %(FALSE)s OR pack_state_tid.tid IS NOT NULL)
"""
self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
- tids = [tid for (tid,) in cursor]
- tids.sort() # oldest first
+ tid_rows = list(cursor)
+ tid_rows.sort() # oldest first
- stmt = """
- SELECT COUNT(1)
- FROM pack_object
- WHERE keep = %(FALSE)s
- """
- self._run_script_stmt(cursor, stmt)
- delete_count = cursor.fetchone()[0]
+ log.info("pack: will pack %d transaction(s)", len(tid_rows))
- stmt = """
- SELECT COUNT(1)
- FROM pack_object
- WHERE keep = %(TRUE)s
- """
- self._run_script_stmt(cursor, stmt)
- trim_count = cursor.fetchone()[0]
-
- log.info(
- "pack: will pack %d transaction(s), delete %s object(s),"
- " and trim %s old object(s)",
- len(tids), delete_count, trim_count)
-
# Hold the commit lock while packing to prevent deadlocks.
# Pack in small batches of transactions in order to minimize
# the interruption of concurrent write operations.
start = time.time()
self._hold_commit_lock(cursor)
- for tid in tids:
- self._pack_transaction(cursor, pack_tid, tid)
- elapsed = time.time() - start
- if elapsed >= batch_timeout:
+ for tid, packed, has_states_to_remove in tid_rows:
+ self._pack_transaction(
+ cursor, pack_tid, tid, packed, has_states_to_remove)
+ if time.time() >= start + batch_timeout:
# commit the work done so far and release the
# commit lock for a short time
conn.commit()
@@ -715,6 +749,7 @@
# Add a delay that matches the amount of time
# spent with the commit lock held, within limits.
# This targets a 50% duty cycle.
+ elapsed = time.time() - start
delay = max(min_delay, min(max_delay, elapsed))
if delay > 0:
log.debug('pack: sleeping %.4g second(s)', delay)
@@ -722,8 +757,7 @@
self._hold_commit_lock(cursor)
start = time.time()
- log.debug("pack: clearing pack_object")
- cursor.execute("DELETE FROM pack_object")
+ self._pack_cleanup(conn, cursor)
except:
log.exception("pack: failed")
@@ -738,76 +772,103 @@
self.close(conn, cursor)
- def _pack_transaction(self, cursor, pack_tid, tid):
+ def _pack_transaction(self, cursor, pack_tid, tid, packed,
+ has_states_to_remove):
"""Pack one transaction. Requires populated pack tables."""
log.debug("pack: transaction %d: packing", tid)
- deleted = 0
- for _table in ('object_ref', 'current_object', 'object_state'):
- # Remove objects that are in pack_object and have keep
- # set to false.
- stmt = """
- DELETE FROM _table
- WHERE tid = %(tid)s
- AND zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = %(FALSE)s
- )
- """.replace('_table', _table)
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- deleted += cursor.rowcount
+ counters = {}
- if _table != 'current_object':
- # Cut the history of objects in pack_object that
- # have keep set to true.
+ if has_states_to_remove:
+ for _table in ('current_object', 'object_state'):
stmt = """
DELETE FROM _table
WHERE tid = %(tid)s
AND zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = %(TRUE)s
- AND keep_tid != %(tid)s
+ SELECT pack_state.zoid
+ FROM pack_state
+ WHERE pack_state.tid = %(tid)s
)
""".replace('_table', _table)
self._run_script_stmt(cursor, stmt, {'tid': tid})
- deleted += cursor.rowcount
+ counters[_table] = cursor.rowcount
- # Terminate prev_tid chains
- stmt = """
- UPDATE object_state SET prev_tid = 0
- WHERE prev_tid = %(tid)s
- AND tid <= %(pack_tid)s
- """
- self._run_script_stmt(cursor, stmt,
- {'pack_tid': pack_tid, 'tid': tid})
+ # Terminate prev_tid chains
+ stmt = """
+ UPDATE object_state SET prev_tid = 0
+ WHERE prev_tid = %(tid)s
+ AND tid <= %(pack_tid)s
+ """
+ self._run_script_stmt(cursor, stmt,
+ {'pack_tid': pack_tid, 'tid': tid})
- # Find out whether the transaction can be removed
+ # Find out whether the transaction is empty
stmt = self._scripts['transaction_has_data']
self._run_script_stmt(cursor, stmt, {'tid': tid})
- has_data = list(cursor)
+ empty = not list(cursor)
- if has_data:
- stmt = """
- UPDATE transaction SET packed = %(TRUE)s
- WHERE tid = %(tid)s
- AND packed = %(FALSE)s
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
-
+ # mark the transaction packed and possibly empty
+ if empty:
+ clause = 'empty = %(TRUE)s'
+ state = 'empty'
else:
- stmt = """
- DELETE FROM object_refs_added
- WHERE tid = %(tid)s;
- DELETE FROM transaction
- WHERE tid = %(tid)s
- """
- self._run_script(cursor, stmt, {'tid': tid})
- deleted += cursor.rowcount
+ clause = 'empty = %(FALSE)s'
+ state = 'not empty'
+ stmt = "UPDATE transaction SET packed = %(TRUE)s, " + clause
+ stmt += " WHERE tid = %(tid)s"
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
- log.debug("pack: transaction %d: removed %d row(s)", tid, deleted)
+ log.debug(
+ "pack: transaction %d (%s): removed %d object(s) and %d state(s)",
+ tid, state,
+ counters.get('current_object', 0),
+ counters.get('object_state', 0))
+ def _pack_cleanup(self, conn, cursor):
+ """Remove unneeded table rows after packing"""
+ # commit the work done so far
+ conn.commit()
+ self._release_commit_lock(cursor)
+ self._hold_commit_lock(cursor)
+ log.info("pack: removing empty packed transactions")
+ stmt = """
+ DELETE FROM transaction
+ WHERE packed = %(TRUE)s
+ AND empty = %(TRUE)s
+ """
+ self._run_script_stmt(cursor, stmt)
+
+ # perform cleanup that does not require the commit lock
+ conn.commit()
+ self._release_commit_lock(cursor)
+
+ log.debug("pack: clearing temporary pack state")
+ for _table in ('pack_object', 'pack_state', 'pack_state_tid'):
+ stmt = '%(TRUNCATE)s ' + _table
+ self._run_script_stmt(cursor, stmt)
+
+ log.debug("pack: removing unused object references")
+ stmt = """
+ DELETE FROM object_ref
+ WHERE tid IN (
+ SELECT tid
+ FROM transaction
+ WHERE empty = %(TRUE)s
+ )
+ """
+ self._run_script_stmt(cursor, stmt)
+
+ stmt = """
+ DELETE FROM object_refs_added
+ WHERE tid IN (
+ SELECT tid
+ FROM transaction
+ WHERE empty = %(TRUE)s
+ )
+ """
+ self._run_script_stmt(cursor, stmt)
+
+
def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
"""Polls for new transactions.
Modified: relstorage/branches/1.1/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2008-04-01 07:00:46 UTC (rev 85056)
+++ relstorage/branches/1.1/relstorage/adapters/postgresql.py 2008-04-08 07:58:59 UTC (rev 85160)
@@ -41,6 +41,7 @@
CREATE TABLE transaction (
tid BIGINT NOT NULL PRIMARY KEY,
packed BOOLEAN NOT NULL DEFAULT FALSE,
+ empty BOOLEAN NOT NULL DEFAULT FALSE,
username BYTEA NOT NULL,
description BYTEA NOT NULL,
extension BYTEA
@@ -74,6 +75,7 @@
tid BIGINT NOT NULL,
FOREIGN KEY (zoid, tid) REFERENCES object_state
);
+ CREATE INDEX current_object_tid ON current_object (tid);
-- During packing, an exclusive lock is held on pack_lock.
CREATE TABLE pack_lock ();
@@ -87,11 +89,9 @@
CREATE TABLE object_ref (
zoid BIGINT NOT NULL,
tid BIGINT NOT NULL,
- to_zoid BIGINT NOT NULL
+ to_zoid BIGINT NOT NULL,
+ PRIMARY KEY (tid, zoid, to_zoid)
);
- CREATE INDEX object_ref_from ON object_ref (zoid);
- CREATE INDEX object_ref_tid ON object_ref (tid);
- CREATE INDEX object_ref_to ON object_ref (to_zoid);
-- The object_refs_added table tracks whether object_refs has
-- been populated for all states in a given transaction.
@@ -122,6 +122,19 @@
WHERE keep = false;
CREATE INDEX pack_object_keep_true ON pack_object (zoid, keep_tid)
WHERE keep = true;
+
+ -- Temporary state during packing: the list of object states to pack.
+ CREATE TABLE pack_state (
+ tid BIGINT NOT NULL,
+ zoid BIGINT NOT NULL,
+ PRIMARY KEY (tid, zoid)
+ );
+
+ -- Temporary state during packing: the list of transactions that
+ -- have at least one object state to pack.
+ CREATE TABLE pack_state_tid (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
"""
cursor.execute(stmt)
Modified: relstorage/branches/1.1/relstorage/tests/packstresstest.py
===================================================================
--- relstorage/trunk/relstorage/tests/packstresstest.py 2008-04-01 07:00:46 UTC (rev 85056)
+++ relstorage/branches/1.1/relstorage/tests/packstresstest.py 2008-04-08 07:58:59 UTC (rev 85160)
@@ -40,8 +40,6 @@
print 'size:'
print d.getSize()
- import pdb; pdb.set_trace()
-
print 'packing...'
d.pack()
More information about the Checkins
mailing list