[Checkins] SVN: relstorage/trunk/relstorage/adapters/ De-duplicated
code by moving each adapter's packing and historical
iteration code into common.Adapter.
Shane Hathaway
shane at hathawaymix.org
Thu Feb 14 02:34:15 EST 2008
Log message for revision 83818:
De-duplicated code by moving each adapter's packing and historical iteration code into common.Adapter.
This wasn't a good idea until the adapter implementations became fairly stable and well understood.
Changed:
U relstorage/trunk/relstorage/adapters/common.py
U relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/postgresql.py
-=-
Modified: relstorage/trunk/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py 2008-02-14 07:26:59 UTC (rev 83817)
+++ relstorage/trunk/relstorage/adapters/common.py 2008-02-14 07:34:15 UTC (rev 83818)
@@ -19,9 +19,70 @@
class Adapter(object):
+ """Common code for a database adapter.
+ This is an abstract class; a lot of methods are expected to be
+ provided by subclasses.
+ """
+
+ # _script_vars contains replacements for statements in scripts.
+ # These are correct for PostgreSQL and MySQL but not for Oracle.
+ _script_vars = {
+ 'TRUE': 'TRUE',
+ 'FALSE': 'FALSE',
+ 'OCTET_LENGTH': 'OCTET_LENGTH',
+ 'oid': '%(oid)s',
+ 'tid': '%(tid)s',
+ 'pack_tid': '%(pack_tid)s',
+ 'undo_tid': '%(undo_tid)s',
+ 'self_tid': '%(self_tid)s',
+ }
+
+ _scripts = {
+ 'select_keep_tid': """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = pack_object.zoid
+ AND tid > 0
+ AND tid <= %(pack_tid)s
+ ORDER BY tid DESC
+ LIMIT 1
+ """,
+
+ 'choose_pack_transaction': """
+ SELECT tid
+ FROM transaction
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND packed = FALSE
+ ORDER BY tid DESC
+ LIMIT 1
+ """,
+ }
+
+
+ def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+ """Execute a statement from a script with the given parameters.
+
+ Subclasses may override this.
+ The input statement is generic and needs to be transformed
+ into a database-specific statement.
+ """
+ stmt = generic_stmt % self._script_vars
+ try:
+ cursor.execute(stmt, generic_params)
+ except:
+ log.warning("script statement failed: %r; parameters: %r",
+ stmt, generic_params)
+ raise
+
+
def _run_script(self, cursor, script, params=()):
- """Execute a series of statements in the database."""
+ """Execute a series of statements in the database.
+
+ The statements are transformed by _run_script_stmt
+ before execution.
+ """
lines = []
for line in script.split('\n'):
line = line.strip()
@@ -31,19 +92,405 @@
line = line[:-1]
lines.append(line)
stmt = '\n'.join(lines)
- try:
- cursor.execute(stmt, params)
- except:
- log.warning("script statement failed: %s", stmt)
- raise
+ self._run_script_stmt(cursor, stmt, params)
lines = []
else:
lines.append(line)
if lines:
+ stmt = '\n'.join(lines)
+ self._run_script_stmt(cursor, stmt, params)
+
+
+ def iter_transactions(self, cursor):
+ """Iterate over the transaction log.
+
+ Yields (tid, username, description, extension) for each transaction.
+ """
+ stmt = """
+ SELECT tid, username, description, extension
+ FROM transaction
+ WHERE packed = %(FALSE)s
+ AND tid != 0
+ ORDER BY tid DESC
+ """
+ self._run_script_stmt(cursor, stmt)
+ return iter(cursor)
+
+
+ def iter_object_history(self, cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+ stmt = """
+ SELECT 1 FROM current_object WHERE zoid = %(oid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'oid': oid})
+ if not cursor.fetchall():
+ raise KeyError(oid)
+
+ stmt = """
+ SELECT tid, username, description, extension, %(OCTET_LENGTH)s(state)
+ FROM transaction
+ JOIN object_state USING (tid)
+ WHERE zoid = %(oid)s
+ AND packed = %(FALSE)s
+ ORDER BY tid DESC
+ """
+ self._run_script_stmt(cursor, stmt, {'oid': oid})
+ return iter(cursor)
+
+
+ def choose_pack_transaction(self, pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+ conn, cursor = self.open()
+ try:
+ stmt = self._scripts['choose_pack_transaction']
+ self._run_script(cursor, stmt, {'tid': pack_point})
+ rows = cursor.fetchall()
+ if not rows:
+ # Nothing needs to be packed.
+ return None
+ return rows[0][0]
+ finally:
+ self.close(conn, cursor)
+
+
+ def pre_pack(self, pack_tid, get_references, gc=True):
+ """Decide what to pack.
+
+ Subclasses may override this.
+
+ tid specifies the most recent transaction to pack.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+
+ gc is a boolean indicating whether to run garbage collection.
+ If gc is false, at least one revision of every object is kept,
+ even if nothing refers to it. Packing with gc disabled can be
+ much faster.
+ """
+ conn, cursor = self.open()
+ try:
try:
- stmt = '\n'.join(lines)
- cursor.execute(stmt, params)
+ if gc:
+ self._pre_pack_with_gc(cursor, pack_tid, get_references)
+ else:
+ self._pre_pack_without_gc(cursor, pack_tid)
except:
- log.warning("script statement failed: %s", stmt)
+ conn.rollback()
raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+ def _pre_pack_without_gc(self, cursor, pack_tid):
+ """Determine what to pack, without garbage collection.
+
+ With garbage collection disabled, there is no need to follow
+ object references.
+ """
+ # Fill the pack_object table with OIDs, but configure them
+ # all to be kept by setting keep and keep_tid.
+ stmt = """
+ DELETE FROM pack_object;
+
+ INSERT INTO pack_object (zoid, keep)
+ SELECT DISTINCT zoid, %(TRUE)s
+ FROM object_state
+ WHERE tid <= %(pack_tid)s;
+
+ UPDATE pack_object SET keep_tid = (@select_keep_tid@)
+ """
+ stmt = stmt.replace(
+ '@select_keep_tid@', self._scripts['select_keep_tid'])
+ self._run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+
+ def _pre_pack_with_gc(self, cursor, pack_tid, get_references):
+ """Determine what to pack, with garbage collection.
+ """
+ # Fill object_ref with references from object states
+ # in transactions that will not be packed.
+ self._fill_nonpacked_refs(cursor, pack_tid, get_references)
+
+ # Fill the pack_object table with OIDs that either will be
+ # removed (if nothing references the OID) or whose history will
+ # be cut.
+ stmt = """
+ DELETE FROM pack_object;
+
+ INSERT INTO pack_object (zoid, keep)
+ SELECT DISTINCT zoid, %(FALSE)s
+ FROM object_state
+ WHERE tid <= %(pack_tid)s;
+
+ -- If the root object is in pack_object, keep it.
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE zoid = 0;
+
+ -- Keep objects that have been revised since pack_tid.
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(pack_tid)s
+ );
+
+ -- Keep objects that are still referenced by object states in
+ -- transactions that will not be packed.
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT to_zoid
+ FROM object_ref
+ WHERE tid > %(pack_tid)s
+ );
+ """
+ self._run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+ self._create_temp_pack_visit(cursor)
+
+ # Each of the packable objects to be kept might
+ # refer to other objects. If some of those references
+ # include objects currently set to be removed, keep
+ # those objects as well. Do this
+ # repeatedly until all references have been satisfied.
+ while True:
+
+ # Make a list of all parent objects that still need
+ # to be visited. Then set keep_tid for all pack_object
+ # rows with keep = true.
+ # keep_tid must be set before _fill_pack_object_refs examines
+ # references.
+ stmt = """
+ DELETE FROM temp_pack_visit;
+
+ INSERT INTO temp_pack_visit (zoid)
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %(TRUE)s
+ AND keep_tid IS NULL;
+
+ UPDATE pack_object SET keep_tid = (@select_keep_tid@)
+ WHERE keep = %(TRUE)s AND keep_tid IS NULL
+ """
+ stmt = stmt.replace(
+ '@select_keep_tid@', self._scripts['select_keep_tid'])
+ self._run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+ self._fill_pack_object_refs(cursor, get_references)
+
+ # Visit the children of all parent objects that were
+ # just visited.
+ stmt = """
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN temp_pack_visit USING (zoid)
+ )
+ """
+ self._run_script_stmt(cursor, stmt)
+ if not cursor.rowcount:
+ # No new references detected.
+ break
+
+
+ def _create_temp_pack_visit(self, cursor):
+ """Create a workspace for listing objects to visit.
+
+ Subclasses can override this.
+ """
+ stmt = """
+ CREATE TEMPORARY TABLE temp_pack_visit (
+ zoid BIGINT NOT NULL
+ );
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
+ """
+ self._run_script(cursor, stmt)
+
+
+ def _fill_nonpacked_refs(self, cursor, pack_tid, get_references):
+ """Fill object_ref for all transactions that will not be packed."""
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ WHERE tid > %(pack_tid)s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_refs_added
+ WHERE tid = object_state.tid
+ )
+ """
+ self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
+ for (tid,) in cursor.fetchall():
+ self._add_refs_for_tid(cursor, tid, get_references)
+
+
+ def _fill_pack_object_refs(self, cursor, get_references):
+ """Fill object_ref for all pack_object rows that have keep_tid."""
+ stmt = """
+ SELECT DISTINCT keep_tid
+ FROM pack_object
+ WHERE keep_tid IS NOT NULL
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_refs_added
+ WHERE tid = keep_tid
+ )
+ """
+ cursor.execute(stmt)
+ for (tid,) in cursor.fetchall():
+ self._add_refs_for_tid(cursor, tid, get_references)
+
+
+ def _add_object_ref_rows(self, cursor, add_rows):
+ """Add rows to object_ref.
+
+ The input rows are tuples containing (from_zoid, tid, to_zoid).
+
+ Subclasses can override this.
+ """
+ stmt = """
+ INSERT INTO object_ref (zoid, tid, to_zoid)
+ VALUES (%s, %s, %s)
+ """
+ cursor.executemany(stmt, add_rows)
+
+
+ def _add_refs_for_tid(self, cursor, tid, get_references):
+ """Fill object_refs with all states for a transaction.
+ """
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = %(tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+ add_rows = [] # [(from_oid, tid, to_oid)]
+ for from_oid, state in cursor:
+ if hasattr(state, 'read'):
+ # cx_Oracle detail
+ state = state.read()
+ if state:
+ to_oids = get_references(str(state))
+ for to_oid in to_oids:
+ add_rows.append((from_oid, tid, to_oid))
+
+ if add_rows:
+ self._add_object_ref_rows(cursor, add_rows)
+
+ # The references have been computed for this transaction.
+ stmt = """
+ INSERT INTO object_refs_added (tid)
+ VALUES (%(tid)s)
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+
+ def _hold_commit_lock(self, cursor):
+ """Hold the commit lock for packing"""
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
+
+ def pack(self, pack_tid):
+ """Pack. Requires populated pack tables."""
+
+ # Read committed mode is sufficient.
+ conn, cursor = self.open()
+ try:
+ try:
+ # hold the commit lock for a moment to prevent deadlocks.
+ self._hold_commit_lock(cursor)
+
+ for table in ('object_ref', 'current_object', 'object_state'):
+
+ # Remove objects that are in pack_object and have keep
+ # set to false.
+ stmt = """
+ DELETE FROM %s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %%(FALSE)s
+ )
+ """ % table
+ self._run_script_stmt(cursor, stmt)
+
+ if table != 'current_object':
+ # Cut the history of objects in pack_object that
+ # have keep set to true.
+ stmt = """
+ DELETE FROM %s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %%(TRUE)s
+ )
+ AND tid < (
+ SELECT keep_tid
+ FROM pack_object
+ WHERE zoid = %s.zoid
+ )
+ """ % (table, table)
+ self._run_script_stmt(cursor, stmt)
+
+ stmt = """
+ -- Terminate prev_tid chains
+ UPDATE object_state SET prev_tid = 0
+ WHERE tid <= %(pack_tid)s
+ AND prev_tid != 0;
+
+ -- For each tid to be removed, delete the corresponding row in
+ -- object_refs_added.
+ DELETE FROM object_refs_added
+ WHERE tid > 0
+ AND tid <= %(pack_tid)s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_state
+ WHERE tid = object_refs_added.tid
+ );
+
+ -- Delete transactions no longer used.
+ DELETE FROM transaction
+ WHERE tid > 0
+ AND tid <= %(pack_tid)s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_state
+ WHERE tid = transaction.tid
+ );
+
+ -- Mark the remaining packable transactions as packed
+ UPDATE transaction SET packed = %(TRUE)s
+ WHERE tid > 0
+ AND tid <= %(pack_tid)s
+ AND packed = %(FALSE)s;
+
+ -- Clean up.
+ DELETE FROM pack_object;
+ """
+ self._run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+ except:
+ conn.rollback()
+ raise
+
+ else:
+ conn.commit()
+
+ finally:
+ self.close(conn, cursor)
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2008-02-14 07:26:59 UTC (rev 83817)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2008-02-14 07:34:15 UTC (rev 83818)
@@ -505,48 +505,6 @@
return oid
- def iter_transactions(self, cursor):
- """Iterate over the transaction log.
-
- Yields (tid, username, description, extension) for each transaction.
- """
- stmt = """
- SELECT tid, username, description, extension
- FROM transaction
- WHERE packed = FALSE
- AND tid != 0
- ORDER BY tid DESC
- """
- cursor.execute(stmt)
- return iter(cursor)
-
-
- def iter_object_history(self, cursor, oid):
- """Iterate over an object's history.
-
- Raises KeyError if the object does not exist.
- Yields (tid, username, description, extension, pickle_size)
- for each modification.
- """
- stmt = """
- SELECT 1 FROM current_object WHERE zoid = %s
- """
- cursor.execute(stmt, (oid,))
- if not cursor.rowcount:
- raise KeyError(oid)
-
- stmt = """
- SELECT tid, username, description, extension, OCTET_LENGTH(state)
- FROM transaction
- JOIN object_state USING (tid)
- WHERE zoid = %s
- AND packed = FALSE
- ORDER BY tid DESC
- """
- cursor.execute(stmt, (oid,))
- return iter(cursor)
-
-
def hold_pack_lock(self, cursor):
"""Try to acquire the pack lock.
@@ -558,6 +516,7 @@
if not res:
raise StorageError('A pack or undo operation is in progress')
+
def release_pack_lock(self, cursor):
"""Release the pack lock."""
stmt = "SELECT RELEASE_LOCK('relstorage.pack')"
@@ -659,330 +618,49 @@
return [oid_int for (oid_int,) in cursor]
- def choose_pack_transaction(self, pack_point):
- """Return the transaction before or at the specified pack time.
-
- Returns None if there is nothing to pack.
- """
- conn, cursor = self.open()
- try:
- stmt = """
- SELECT tid
- FROM transaction
- WHERE tid > 0 AND tid <= %s
- AND packed = FALSE
- ORDER BY tid DESC
- LIMIT 1
- """
- cursor.execute(stmt, (pack_point,))
- if not cursor.rowcount:
- # Nothing needs to be packed.
- return None
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
- finally:
- self.close(conn, cursor)
-
-
- def pre_pack(self, pack_tid, get_references):
+ def pre_pack(self, pack_tid, get_references, gc=True):
"""Decide what to pack.
- tid specifies the most recent transaction to pack.
-
- get_references is a function that accepts a pickled state and
- returns a set of OIDs that state refers to.
+ This overrides the method by the same name in common.Adapter.
"""
conn, cursor = self.open(transaction_mode=None)
try:
# This phase of packing works best with transactions
# disabled. It changes no user-facing data.
conn.autocommit(True)
- self._pre_pack_cursor(cursor, pack_tid, get_references)
+ if gc:
+ self._pre_pack_with_gc(cursor, pack_tid, get_references)
+ else:
+ self._pre_pack_without_gc(cursor, pack_tid)
finally:
self.close(conn, cursor)
- def _pre_pack_cursor(self, cursor, pack_tid, get_references):
- """pre_pack implementation.
- """
- # Fill object_ref with references from object states
- # in transactions that will not be packed.
- self._fill_nonpacked_refs(cursor, pack_tid, get_references)
+ def _create_temp_pack_visit(self, cursor):
+ """Create a workspace for listing objects to visit.
- # Ensure the temporary pack_object table is clear.
- cursor.execute("TRUNCATE pack_object")
-
- args = {'pack_tid': pack_tid}
-
- # Fill the pack_object table with OIDs that either will be
- # removed (if nothing references the OID) or whose history will
- # be cut.
- stmt = """
- INSERT INTO pack_object (zoid, keep)
- SELECT DISTINCT zoid, false
- FROM object_state
- WHERE tid <= %(pack_tid)s
+ This overrides the method by the same name in common.Adapter.
"""
- cursor.execute(stmt, args)
-
- # If the root object is in pack_object, keep it.
stmt = """
- UPDATE pack_object SET keep = true
- WHERE zoid = 0
- """
- cursor.execute(stmt)
-
- # Keep objects that have been revised since pack_tid.
- stmt = """
- UPDATE pack_object SET keep = true
- WHERE keep = false
- AND zoid IN (
- SELECT zoid
- FROM current_object
- WHERE tid > %(pack_tid)s
- )
- """
- cursor.execute(stmt, args)
-
- # Keep objects that are still referenced by object states in
- # transactions that will not be packed.
- stmt = """
- UPDATE pack_object SET keep = true
- WHERE keep = false
- AND zoid IN (
- SELECT to_zoid
- FROM object_ref
- WHERE tid > %(pack_tid)s
- )
- """
- cursor.execute(stmt, args)
-
- # Create a small workspace
- stmt = """
CREATE TEMPORARY TABLE temp_pack_visit (
zoid BIGINT NOT NULL PRIMARY KEY
)
"""
cursor.execute(stmt)
- # Each of the packable objects to be kept might
- # refer to other objects. If some of those references
- # include objects currently set to be removed, keep
- # those objects as well. Do this
- # repeatedly until all references have been satisfied.
- while True:
- # Make a list of all parent objects that still need
- # to be visited.
- cursor.execute("DELETE FROM temp_pack_visit")
- stmt = """
- INSERT INTO temp_pack_visit (zoid)
- SELECT zoid
- FROM pack_object
- WHERE keep = true
- AND keep_tid IS NULL
- """
- cursor.execute(stmt)
+ def _hold_commit_lock(self, cursor):
+ """Hold the commit lock for packing.
- # Set keep_tid for all pack_object rows with keep = 'Y'.
- # This must be done before _fill_pack_object_refs examines
- # references.
- stmt = """
- UPDATE pack_object SET keep_tid = (
- SELECT tid
- FROM object_state
- WHERE zoid = pack_object.zoid
- AND tid > 0
- AND tid <= %(pack_tid)s
- ORDER BY tid DESC
- LIMIT 1
- )
- WHERE keep = true
- AND keep_tid IS NULL
- """
- cursor.execute(stmt, args)
-
- self._fill_pack_object_refs(cursor, get_references)
-
- # Visit the children of all parent objects that were
- # just visited.
- stmt = """
- UPDATE pack_object SET keep = true
- WHERE keep = false
- AND zoid IN (
- SELECT DISTINCT to_zoid
- FROM object_ref
- JOIN temp_pack_visit USING (zoid)
- )
- """
- cursor.execute(stmt)
- if not cursor.rowcount:
- # No new references detected.
- break
-
-
- def _fill_nonpacked_refs(self, cursor, pack_tid, get_references):
- """Fill object_ref for all transactions that will not be packed."""
- stmt = """
- SELECT DISTINCT tid
- FROM object_state
- WHERE tid > %s
- AND NOT EXISTS (
- SELECT 1
- FROM object_refs_added
- WHERE tid = object_state.tid
- )
+ This overrides the method by the same name in common.Adapter.
"""
- cursor.execute(stmt, (pack_tid,))
- for (tid,) in cursor.fetchall():
- self._add_refs_for_tid(cursor, tid, get_references)
+ cursor.execute("SELECT GET_LOCK('relstorage.commit', %s)",
+ (commit_lock_timeout,))
+ locked = cursor.fetchone()[0]
+ if not locked:
+ raise StorageError("Unable to acquire commit lock")
- def _fill_pack_object_refs(self, cursor, get_references):
- """Fill object_ref for all pack_object rows that have keep_tid."""
- stmt = """
- SELECT DISTINCT keep_tid
- FROM pack_object
- WHERE keep_tid IS NOT NULL
- AND NOT EXISTS (
- SELECT 1
- FROM object_refs_added
- WHERE tid = keep_tid
- )
- """
- cursor.execute(stmt)
- for (tid,) in cursor.fetchall():
- self._add_refs_for_tid(cursor, tid, get_references)
-
-
- def _add_refs_for_tid(self, cursor, tid, get_references):
- """Fills object_refs with all states for a transaction.
- """
- stmt = """
- SELECT zoid, state
- FROM object_state
- WHERE tid = %s
- """
- cursor.execute(stmt, (tid,))
-
- to_add = [] # [(from_oid, tid, to_oid)]
- for from_oid, state in cursor:
- if state:
- to_oids = get_references(state)
- for to_oid in to_oids:
- to_add.append((from_oid, tid, to_oid))
-
- if to_add:
- stmt = """
- INSERT INTO object_ref (zoid, tid, to_zoid)
- VALUES (%s, %s, %s)
- """
- cursor.executemany(stmt, to_add)
-
- # The references have been computed for this transaction.
- stmt = """
- INSERT INTO object_refs_added (tid)
- VALUES (%s)
- """
- cursor.execute(stmt, (tid,))
-
-
- def pack(self, pack_tid):
- """Pack. Requires populated pack tables."""
-
- # Read committed mode is sufficient.
- conn, cursor = self.open()
- try:
- # Pause concurrent commits.
- cursor.execute("SELECT GET_LOCK('relstorage.commit', %s)",
- (commit_lock_timeout,))
- locked = cursor.fetchone()[0]
- if not locked:
- raise StorageError("Unable to acquire commit lock")
-
- try:
-
- for table in ('object_ref', 'current_object', 'object_state'):
-
- # Remove objects that are in pack_object and have keep
- # set to false.
- stmt = """
- DELETE FROM %s
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = false
- )
- """ % table
- cursor.execute(stmt)
-
- if table != 'current_object':
- # Cut the history of objects in pack_object that
- # have keep set to true.
- stmt = """
- DELETE FROM %s
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = true
- )
- AND tid < (
- SELECT keep_tid
- FROM pack_object
- WHERE zoid = %s.zoid
- )
- """ % (table, table)
- cursor.execute(stmt)
-
- stmt = """
- -- Terminate prev_tid chains
- UPDATE object_state SET prev_tid = 0
- WHERE tid <= %(tid)s
- AND prev_tid != 0;
-
- -- For each tid to be removed, delete the corresponding row in
- -- object_refs_added.
- DELETE FROM object_refs_added
- WHERE tid > 0
- AND tid <= %(tid)s
- AND NOT EXISTS (
- SELECT 1
- FROM object_state
- WHERE tid = object_refs_added.tid
- );
-
- -- Delete transactions no longer used.
- DELETE FROM transaction
- WHERE tid > 0
- AND tid <= %(tid)s
- AND NOT EXISTS (
- SELECT 1
- FROM object_state
- WHERE tid = transaction.tid
- );
-
- -- Mark the remaining packable transactions as packed
- UPDATE transaction SET packed = true
- WHERE tid > 0
- AND tid <= %(tid)s
- AND packed = false
- """
- self._run_script(cursor, stmt, {'tid': pack_tid})
-
- # Clean up
- cursor.execute("TRUNCATE pack_object")
-
- except:
- conn.rollback()
- raise
-
- else:
- conn.commit()
-
- finally:
- self.close(conn, cursor)
-
-
def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
"""Polls for new transactions.
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2008-02-14 07:26:59 UTC (rev 83817)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2008-02-14 07:34:15 UTC (rev 83818)
@@ -28,11 +28,68 @@
class OracleAdapter(Adapter):
"""Oracle adapter for RelStorage."""
+ _script_vars = {
+ 'TRUE': "'Y'",
+ 'FALSE': "'N'",
+ 'OCTET_LENGTH': 'LENGTH',
+ 'oid': ':oid',
+ 'tid': ':tid',
+ 'pack_tid': ':pack_tid',
+ 'undo_tid': ':undo_tid',
+ 'self_tid': ':self_tid',
+ }
+
+ _scripts = {
+ 'select_keep_tid': """
+ SELECT MAX(tid)
+ FROM object_state
+ WHERE zoid = pack_object.zoid
+ AND tid > 0
+ AND tid <= %(pack_tid)s
+ """,
+
+ 'choose_pack_transaction': """
+ SELECT MAX(tid)
+ FROM transaction
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND packed = 'N'
+ """,
+ }
+
def __init__(self, user, password, dsn, twophase=False, arraysize=64):
self._params = (user, password, dsn)
self._twophase = twophase
self._arraysize = arraysize
+
+ def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+ """Execute a statement from a script with the given parameters.
+
+ This overrides the method by the same name in common.Adapter.
+ """
+ if generic_params:
+ # Oracle raises ORA-01036 if the parameter map contains extra keys,
+ # so filter out any unused parameters.
+ tracker = TrackingMap(self._script_vars)
+ stmt = generic_stmt % tracker
+ used = tracker.used
+ params = {}
+ for k, v in generic_params.iteritems():
+ if k in used:
+ params[k] = v
+ else:
+ stmt = generic_stmt % self._script_vars
+ params = ()
+
+ try:
+ cursor.execute(stmt, params)
+ except:
+ log.warning("script statement failed: %r; parameters: %r",
+ stmt, params)
+ raise
+
+
def create_schema(self, cursor):
"""Create the database tables."""
stmt = """
@@ -525,48 +582,6 @@
return cursor.fetchone()[0]
- def iter_transactions(self, cursor):
- """Iterate over the transaction log.
-
- Yields (tid, username, description, extension) for each transaction.
- """
- stmt = """
- SELECT tid, username, description, extension
- FROM transaction
- WHERE packed = 'N'
- AND tid != 0
- ORDER BY tid DESC
- """
- cursor.execute(stmt)
- return iter(cursor)
-
-
- def iter_object_history(self, cursor, oid):
- """Iterate over an object's history.
-
- Raises KeyError if the object does not exist.
- Yields (tid, username, description, extension, pickle_size)
- for each modification.
- """
- stmt = """
- SELECT 1 FROM current_object WHERE zoid = :1
- """
- cursor.execute(stmt, (oid,))
- if not cursor.fetchall():
- raise KeyError(oid)
-
- stmt = """
- SELECT tid, username, description, extension, LENGTH(state)
- FROM transaction
- JOIN object_state USING (tid)
- WHERE zoid = :1
- AND packed = 'N'
- ORDER BY tid DESC
- """
- cursor.execute(stmt, (oid,))
- return iter(cursor)
-
-
def hold_pack_lock(self, cursor):
"""Try to acquire the pack lock.
@@ -595,8 +610,6 @@
if not cursor.fetchall():
raise UndoError("Transaction not found or packed")
- self.hold_pack_lock(cursor)
-
# Rule: we can undo an object if the object's state in the
# transaction to undo matches the object's current state.
# If any object in the transaction does not fit that rule,
@@ -650,347 +663,61 @@
LEFT JOIN object_state prev
ON (prev.zoid = undoing.zoid
AND prev.tid = undoing.prev_tid)
- WHERE undoing.tid = :undo_tid
+ WHERE undoing.tid = %(undo_tid)s
AND undoing.zoid = object_state.zoid
)
- WHERE tid = :self_tid
+ WHERE tid = %(self_tid)s
AND zoid IN (
- SELECT zoid FROM object_state WHERE tid = :undo_tid);
+ SELECT zoid FROM object_state WHERE tid = %(undo_tid)s);
-- Add new undo records.
INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT undoing.zoid, :self_tid, current_object.tid,
+ SELECT undoing.zoid, %(self_tid)s, current_object.tid,
prev.md5, prev.state
FROM object_state undoing
JOIN current_object ON (current_object.zoid = undoing.zoid)
LEFT JOIN object_state prev
ON (prev.zoid = undoing.zoid
AND prev.tid = undoing.prev_tid)
- WHERE undoing.tid = :undo_tid
+ WHERE undoing.tid = %(undo_tid)s
AND undoing.zoid NOT IN (
- SELECT zoid FROM object_state WHERE tid = :self_tid);
+ SELECT zoid FROM object_state WHERE tid = %(self_tid)s);
+
+ -- List the changed OIDs.
+
+ SELECT zoid FROM object_state WHERE tid = %(undo_tid)s
"""
self._run_script(cursor, stmt,
{'undo_tid': undo_tid, 'self_tid': self_tid})
- # List the changed OIDs.
- stmt = "SELECT zoid FROM object_state WHERE tid = :1"
- cursor.execute(stmt, (undo_tid,))
return [oid_int for (oid_int,) in cursor]
- def choose_pack_transaction(self, pack_point):
- """Return the transaction before or at the specified pack time.
+ def _create_temp_pack_visit(self, cursor):
+ """Create a workspace for listing objects to visit.
- Returns None if there is nothing to pack.
+ This overrides the method by the same name in common.Adapter.
"""
- conn, cursor = self.open()
- try:
- stmt = """
- SELECT MAX(tid)
- FROM transaction
- WHERE tid > 0
- AND tid <= :1
- AND packed = 'N'
- """
- cursor.execute(stmt, (pack_point,))
- rows = cursor.fetchall()
- if not rows:
- # Nothing needs to be packed.
- return None
- return rows[0][0]
- finally:
- self.close(conn, cursor)
+ # The temp_pack_visit table is a global temporary table,
+ # so it does not need to be created here.
+ pass
- def pre_pack(self, pack_tid, get_references):
- """Decide what to pack.
+ def _add_object_ref_rows(self, cursor, add_rows):
+ """Add rows to object_ref.
- tid specifies the most recent transaction to pack.
+ The input rows are tuples containing (from_zoid, tid, to_zoid).
- get_references is a function that accepts a pickled state and
- returns a set of OIDs that state refers to.
+ This overrides the method by the same name in common.Adapter.
"""
- conn, cursor = self.open()
- try:
- try:
- self._pre_pack_cursor(cursor, pack_tid, get_references)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def _pre_pack_cursor(self, cursor, pack_tid, get_references):
- """pre_pack implementation.
- """
- self._fill_nonpacked_refs(cursor, pack_tid, get_references)
-
- args = {'pack_tid': pack_tid}
-
- # Ensure the temporary pack_object table is clear.
- cursor.execute("DELETE FROM pack_object")
-
- # Fill the pack_object table with OIDs that either will be
- # removed (if nothing references the OID) or whose history will
- # be cut.
stmt = """
- INSERT INTO pack_object (zoid, keep)
- SELECT DISTINCT zoid, 'N'
- FROM object_state
- WHERE tid <= :pack_tid
+ INSERT INTO object_ref (zoid, tid, to_zoid)
+ VALUES (:1, :2, :3)
"""
- cursor.execute(stmt, args)
+ cursor.executemany(stmt, add_rows)
- # If the root object is in pack_object, keep it.
- stmt = """
- UPDATE pack_object SET keep = 'Y'
- WHERE zoid = 0
- """
- cursor.execute(stmt)
- # Keep objects that have been revised since pack_tid.
- stmt = """
- UPDATE pack_object SET keep = 'Y'
- WHERE keep = 'N'
- AND zoid IN (
- SELECT zoid
- FROM current_object
- WHERE tid > :pack_tid
- )
- """
- cursor.execute(stmt, args)
-
- # Keep objects that are still referenced by object states in
- # transactions that will not be packed.
- stmt = """
- UPDATE pack_object SET keep = 'Y'
- WHERE keep = 'N'
- AND zoid IN (
- SELECT to_zoid
- FROM object_ref
- WHERE tid > :pack_tid
- )
- """
- cursor.execute(stmt, args)
-
- # Each of the packable objects to be kept might
- # refer to other objects. If some of those references
- # include objects currently set to be removed, keep
- # those objects as well. Do this
- # repeatedly until all references have been satisfied.
- while True:
-
- # Make a list of all parent objects that still need
- # to be visited.
- cursor.execute("DELETE FROM temp_pack_visit")
- stmt = """
- INSERT INTO temp_pack_visit (zoid)
- SELECT zoid
- FROM pack_object
- WHERE keep = 'Y'
- AND keep_tid IS NULL
- """
- cursor.execute(stmt)
-
- # Set keep_tid for all pack_object rows with keep = 'Y'.
- # This must be done before _fill_pack_object_refs examines
- # references.
- stmt = """
- UPDATE pack_object SET keep_tid = (
- SELECT MAX(tid)
- FROM object_state
- WHERE zoid = pack_object.zoid
- AND tid > 0
- AND tid <= :pack_tid
- )
- WHERE keep = 'Y'
- AND keep_tid IS NULL
- """
- cursor.execute(stmt, args)
-
- self._fill_pack_object_refs(cursor, get_references)
-
- # Visit the children of all parent objects that were
- # just visited.
- stmt = """
- UPDATE pack_object SET keep = 'Y'
- WHERE keep = 'N'
- AND zoid IN (
- SELECT DISTINCT to_zoid
- FROM object_ref
- JOIN temp_pack_visit USING (zoid)
- )
- """
- cursor.execute(stmt)
- if not cursor.rowcount:
- # No new references detected.
- break
-
-
- def _fill_nonpacked_refs(self, cursor, pack_tid, get_references):
- """Fill object_ref for all transactions that will not be packed."""
- stmt = """
- SELECT DISTINCT tid
- FROM object_state
- WHERE tid > :1
- AND NOT EXISTS (
- SELECT 1
- FROM object_refs_added
- WHERE tid = object_state.tid
- )
- """
- cursor.execute(stmt, (pack_tid,))
- for (tid,) in cursor.fetchall():
- self._add_refs_for_tid(cursor, tid, get_references)
-
-
- def _fill_pack_object_refs(self, cursor, get_references):
- """Fill object_ref for all pack_object rows that have keep_tid."""
- stmt = """
- SELECT DISTINCT keep_tid
- FROM pack_object
- WHERE keep_tid IS NOT NULL
- AND NOT EXISTS (
- SELECT 1
- FROM object_refs_added
- WHERE tid = keep_tid
- )
- """
- cursor.execute(stmt)
- for (tid,) in cursor.fetchall():
- self._add_refs_for_tid(cursor, tid, get_references)
-
-
- def _add_refs_for_tid(self, cursor, tid, get_references):
- """Fills object_refs with all states for a transaction.
- """
- stmt = """
- SELECT zoid, state
- FROM object_state
- WHERE tid = :1
- """
- cursor.execute(stmt, (tid,))
-
- to_add = [] # [(from_oid, tid, to_oid)]
- for from_oid, state_file in cursor:
- if state_file is not None:
- state = state_file.read()
- if state is not None:
- to_oids = get_references(state)
- for to_oid in to_oids:
- to_add.append((from_oid, tid, to_oid))
-
- if to_add:
- stmt = """
- INSERT INTO object_ref (zoid, tid, to_zoid)
- VALUES (:1, :2, :3)
- """
- cursor.executemany(stmt, to_add)
-
- # The references have been computed for this transaction.
- stmt = """
- INSERT INTO object_refs_added (tid)
- VALUES (:1)
- """
- cursor.execute(stmt, (tid,))
-
-
- def pack(self, pack_tid):
- """Pack. Requires populated pack tables."""
-
- # Read committed mode is sufficient.
- conn, cursor = self.open()
- try:
- try:
- # hold the commit lock for a moment to prevent deadlocks.
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
-
- for table in ('object_ref', 'current_object', 'object_state'):
-
- # Remove objects that are in pack_object and have keep
- # set to 'N'.
- stmt = """
- DELETE FROM %s
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = 'N'
- )
- """ % table
- cursor.execute(stmt)
-
- if table != 'current_object':
- # Cut the history of objects in pack_object that
- # have keep set to 'Y'.
- stmt = """
- DELETE FROM %s
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = 'Y'
- )
- AND tid < (
- SELECT keep_tid
- FROM pack_object
- WHERE zoid = %s.zoid
- )
- """ % (table, table)
- cursor.execute(stmt)
-
- stmt = """
- -- Terminate prev_tid chains
- UPDATE object_state SET prev_tid = 0
- WHERE tid <= :tid
- AND prev_tid != 0;
-
- -- For each tid to be removed, delete the corresponding row in
- -- object_refs_added.
- DELETE FROM object_refs_added
- WHERE tid > 0
- AND tid <= :tid
- AND NOT EXISTS (
- SELECT 1
- FROM object_state
- WHERE tid = object_refs_added.tid
- );
-
- -- Delete transactions no longer used.
- DELETE FROM transaction
- WHERE tid > 0
- AND tid <= :tid
- AND NOT EXISTS (
- SELECT 1
- FROM object_state
- WHERE tid = transaction.tid
- );
-
- -- Mark the remaining packable transactions as packed
- UPDATE transaction SET packed = 'Y'
- WHERE tid > 0
- AND tid <= :tid
- AND packed = 'N'
- """
- self._run_script(cursor, stmt, {'tid': pack_tid})
-
- # Clean up
- cursor.execute("DELETE FROM pack_object")
-
- except:
- conn.rollback()
- raise
-
- else:
- conn.commit()
-
- finally:
- self.close(conn, cursor)
-
-
def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
"""Polls for new transactions.
@@ -1042,3 +769,15 @@
except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
raise StorageError("database disconnected")
+
+class TrackingMap:
+ """Provides values for keys while tracking which keys are accessed."""
+
+ def __init__(self, source):
+ self.source = source
+ self.used = set()
+
+ def __getitem__(self, key):
+ self.used.add(key)
+ return self.source[key]
+
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2008-02-14 07:26:59 UTC (rev 83817)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2008-02-14 07:34:15 UTC (rev 83818)
@@ -542,48 +542,6 @@
return cursor.fetchone()[0]
- def iter_transactions(self, cursor):
- """Iterate over the transaction log.
-
- Yields (tid, username, description, extension) for each transaction.
- """
- stmt = """
- SELECT tid, username, description, extension
- FROM transaction
- WHERE packed = FALSE
- AND tid != 0
- ORDER BY tid DESC
- """
- cursor.execute(stmt)
- return iter(cursor)
-
-
- def iter_object_history(self, cursor, oid):
- """Iterate over an object's history.
-
- Raises KeyError if the object does not exist.
- Yields (tid, username, description, extension, pickle_size)
- for each modification.
- """
- stmt = """
- SELECT 1 FROM current_object WHERE zoid = %s
- """
- cursor.execute(stmt, (oid,))
- if not cursor.rowcount:
- raise KeyError(oid)
-
- stmt = """
- SELECT tid, username, description, extension, OCTET_LENGTH(state)
- FROM transaction
- JOIN object_state USING (tid)
- WHERE zoid = %s
- AND packed = FALSE
- ORDER BY tid DESC
- """
- cursor.execute(stmt, (oid,))
- return iter(cursor)
-
-
def hold_pack_lock(self, cursor):
"""Try to acquire the pack lock.
@@ -612,8 +570,6 @@
if not cursor.rowcount:
raise UndoError("Transaction not found or packed")
- self.hold_pack_lock(cursor)
-
# Rule: we can undo an object if the object's state in the
# transaction to undo matches the object's current state.
# If any object in the transaction does not fit that rule,
@@ -706,329 +662,6 @@
return [oid_int for (oid_int,) in cursor]
- def choose_pack_transaction(self, pack_point):
- """Return the transaction before or at the specified pack time.
-
- Returns None if there is nothing to pack.
- """
- conn, cursor = self.open()
- try:
- stmt = """
- SELECT tid
- FROM transaction
- WHERE tid > 0 AND tid <= %s
- AND packed = FALSE
- ORDER BY tid DESC
- LIMIT 1
- """
- cursor.execute(stmt, (pack_point,))
- if not cursor.rowcount:
- # Nothing needs to be packed.
- return None
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
- finally:
- self.close(conn, cursor)
-
-
- def pre_pack(self, pack_tid, get_references):
- """Decide what to pack.
-
- tid specifies the most recent transaction to pack.
-
- get_references is a function that accepts a pickled state and
- returns a set of OIDs that state refers to.
- """
- conn, cursor = self.open()
- try:
- try:
- self._pre_pack_cursor(cursor, pack_tid, get_references)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def _pre_pack_cursor(self, cursor, pack_tid, get_references):
- """pre_pack implementation.
- """
- # Fill object_ref with references from object states
- # in transactions that will not be packed.
- self._fill_nonpacked_refs(cursor, pack_tid, get_references)
-
- # Ensure the temporary pack_object table is clear.
- cursor.execute("TRUNCATE pack_object")
-
- args = {'pack_tid': pack_tid}
-
- # Fill the pack_object table with OIDs that either will be
- # removed (if nothing references the OID) or whose history will
- # be cut.
- stmt = """
- INSERT INTO pack_object (zoid, keep)
- SELECT DISTINCT zoid, false
- FROM object_state
- WHERE tid <= %(pack_tid)s
- """
- cursor.execute(stmt, args)
-
- # If the root object is in pack_object, keep it.
- stmt = """
- UPDATE pack_object SET keep = true
- WHERE zoid = 0
- """
- cursor.execute(stmt)
-
- # Keep objects that have been revised since pack_tid.
- stmt = """
- UPDATE pack_object SET keep = true
- WHERE keep = false
- AND zoid IN (
- SELECT zoid
- FROM current_object
- WHERE tid > %(pack_tid)s
- )
- """
- cursor.execute(stmt, args)
-
- # Keep objects that are still referenced by object states in
- # transactions that will not be packed.
- stmt = """
- UPDATE pack_object SET keep = true
- WHERE keep = false
- AND zoid IN (
- SELECT to_zoid
- FROM object_ref
- WHERE tid > %(pack_tid)s
- )
- """
- cursor.execute(stmt, args)
-
- # Create a small workspace
- stmt = """
- CREATE TEMPORARY TABLE temp_pack_visit (
- zoid BIGINT NOT NULL
- );
- CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
- """
- cursor.execute(stmt)
-
- # Each of the packable objects to be kept might
- # refer to other objects. If some of those references
- # include objects currently set to be removed, keep
- # those objects as well. Do this
- # repeatedly until all references have been satisfied.
- while True:
-
- # Make a list of all parent objects that still need
- # to be visited.
- stmt = """
- TRUNCATE temp_pack_visit;
- INSERT INTO temp_pack_visit (zoid)
- SELECT zoid
- FROM pack_object
- WHERE keep = true
- AND keep_tid IS NULL
- """
- cursor.execute(stmt)
-
- # Set keep_tid for all pack_object rows with keep = 'Y'.
- # This must be done before _fill_pack_object_refs examines
- # references.
- stmt = """
- UPDATE pack_object SET keep_tid = (
- SELECT tid
- FROM object_state
- WHERE zoid = pack_object.zoid
- AND tid > 0
- AND tid <= %(pack_tid)s
- ORDER BY tid DESC
- LIMIT 1
- )
- WHERE keep = true AND keep_tid IS NULL
- """
- cursor.execute(stmt, args)
-
- self._fill_pack_object_refs(cursor, get_references)
-
- # Visit the children of all parent objects that were
- # just visited.
- stmt = """
- UPDATE pack_object SET keep = true
- WHERE keep = false
- AND zoid IN (
- SELECT DISTINCT to_zoid
- FROM object_ref
- JOIN temp_pack_visit USING (zoid)
- )
- """
- cursor.execute(stmt)
- if not cursor.rowcount:
- # No new references detected.
- break
-
-
- def _fill_nonpacked_refs(self, cursor, pack_tid, get_references):
- """Fill object_ref for all transactions that will not be packed."""
- stmt = """
- SELECT DISTINCT tid
- FROM object_state
- WHERE tid > %s
- AND NOT EXISTS (
- SELECT 1
- FROM object_refs_added
- WHERE tid = object_state.tid
- )
- """
- cursor.execute(stmt, (pack_tid,))
- for (tid,) in cursor.fetchall():
- self._add_refs_for_tid(cursor, tid, get_references)
-
-
- def _fill_pack_object_refs(self, cursor, get_references):
- """Fill object_ref for all pack_object rows that have keep_tid."""
- stmt = """
- SELECT DISTINCT keep_tid
- FROM pack_object
- WHERE keep_tid IS NOT NULL
- AND NOT EXISTS (
- SELECT 1
- FROM object_refs_added
- WHERE tid = keep_tid
- )
- """
- cursor.execute(stmt)
- for (tid,) in cursor.fetchall():
- self._add_refs_for_tid(cursor, tid, get_references)
-
-
- def _add_refs_for_tid(self, cursor, tid, get_references):
- """Fills object_refs with all states for a transaction.
- """
- stmt = """
- SELECT zoid, encode(state, 'base64')
- FROM object_state
- WHERE tid = %s
- """
- cursor.execute(stmt, (tid,))
-
- to_add = [] # [(from_oid, tid, to_oid)]
- for from_oid, state64 in cursor:
- if state64 is not None:
- state = decodestring(state64)
- to_oids = get_references(state)
- for to_oid in to_oids:
- to_add.append((from_oid, tid, to_oid))
-
- if to_add:
- stmt = """
- INSERT INTO object_ref (zoid, tid, to_zoid)
- VALUES (%s, %s, %s)
- """
- cursor.executemany(stmt, to_add)
-
- # The references have been computed for this transaction.
- stmt = """
- INSERT INTO object_refs_added (tid)
- VALUES (%s)
- """
- cursor.execute(stmt, (tid,))
-
-
- def pack(self, pack_tid):
- """Pack. Requires populated pack tables."""
-
- # Read committed mode is sufficient.
- conn, cursor = self.open()
- try:
- try:
- # hold the commit lock for a moment to prevent deadlocks.
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
-
- for table in ('object_ref', 'current_object', 'object_state'):
-
- # Remove objects that are in pack_object and have keep
- # set to false.
- stmt = """
- DELETE FROM %s
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = false
- )
- """ % table
- cursor.execute(stmt)
-
- if table != 'current_object':
- # Cut the history of objects in pack_object that
- # have keep set to true.
- stmt = """
- DELETE FROM %s
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = true
- )
- AND tid < (
- SELECT keep_tid
- FROM pack_object
- WHERE zoid = %s.zoid
- )
- """ % (table, table)
- cursor.execute(stmt)
-
- stmt = """
- -- Terminate prev_tid chains
- UPDATE object_state SET prev_tid = 0
- WHERE tid <= %(tid)s
- AND prev_tid != 0;
-
- -- For each tid to be removed, delete the corresponding row in
- -- object_refs_added.
- DELETE FROM object_refs_added
- WHERE tid > 0
- AND tid <= %(tid)s
- AND NOT EXISTS (
- SELECT 1
- FROM object_state
- WHERE tid = object_refs_added.tid
- );
-
- -- Delete transactions no longer used.
- DELETE FROM transaction
- WHERE tid > 0
- AND tid <= %(tid)s
- AND NOT EXISTS (
- SELECT 1
- FROM object_state
- WHERE tid = transaction.tid
- );
-
- -- Mark the remaining packable transactions as packed
- UPDATE transaction SET packed = true
- WHERE tid > 0
- AND tid <= %(tid)s
- AND packed = false
- """
- cursor.execute(stmt, {'tid': pack_tid})
-
- # Clean up
- cursor.execute("TRUNCATE pack_object")
-
- except:
- conn.rollback()
- raise
-
- else:
- conn.commit()
-
- finally:
- self.close(conn, cursor)
-
-
def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
"""Polls for new transactions.
More information about the Checkins
mailing list