[Checkins] SVN: relstorage/trunk/relstorage/ Split the abstract adapter base into history-preserving and
Shane Hathaway
shane at hathawaymix.org
Wed Sep 23 03:38:44 EDT 2009
Log message for revision 104444:
Split the abstract adapter base into history-preserving and
history-free derivatives.
Changed:
A relstorage/trunk/relstorage/adapters/abstract.py
D relstorage/trunk/relstorage/adapters/common.py
A relstorage/trunk/relstorage/adapters/historyfree.py
A relstorage/trunk/relstorage/adapters/historypreserving.py
U relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/postgresql.py
U relstorage/trunk/relstorage/relstorage.py
-=-
Added: relstorage/trunk/relstorage/adapters/abstract.py
===================================================================
--- relstorage/trunk/relstorage/adapters/abstract.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/abstract.py 2009-09-23 07:38:44 UTC (rev 104444)
@@ -0,0 +1,414 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Code common to most adapters."""
+
+import logging
+import time
+
+try:
+ from hashlib import md5
+except ImportError:
+ from md5 import new as md5
+
+
+log = logging.getLogger(__name__)
+
+# Notes about adapters:
+#
+# An adapter must not hold a connection, cursor, or database state, because
+# RelStorage opens multiple concurrent connections using a single adapter
+# instance.
+# Within the context of an adapter, all OID and TID values are integers,
+# not binary strings, except as noted.
+
+class AbstractAdapter(object):
+ """Common code for a database adapter.
+ """
+
+ keep_history = None # True or False
+ verify_sane_database = False
+
+ # _script_vars contains replacements for statements in scripts.
+ # These are correct for PostgreSQL and MySQL but not for Oracle.
+ _script_vars = {
+ 'TRUE': 'TRUE',
+ 'FALSE': 'FALSE',
+ 'OCTET_LENGTH': 'OCTET_LENGTH',
+ 'TRUNCATE': 'TRUNCATE',
+ 'oid': '%(oid)s',
+ 'tid': '%(tid)s',
+ 'pack_tid': '%(pack_tid)s',
+ 'undo_tid': '%(undo_tid)s',
+ 'self_tid': '%(self_tid)s',
+ 'min_tid': '%(min_tid)s',
+ 'max_tid': '%(max_tid)s',
+ }
+
+ def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+ """Execute a statement from a script with the given parameters.
+
+ params should be either an empty tuple (no parameters) or
+ a map.
+
+ Subclasses may override this.
+ The input statement is generic and needs to be transformed
+ into a database-specific statement.
+ """
+ stmt = generic_stmt % self._script_vars
+ try:
+ cursor.execute(stmt, generic_params)
+ except:
+ log.warning("script statement failed: %r; parameters: %r",
+ stmt, generic_params)
+ raise
+
+ def _run_script(self, cursor, script, params=()):
+ """Execute a series of statements in the database.
+
+ params should be either an empty tuple (no parameters) or
+ a map.
+
+ The statements are transformed by _run_script_stmt
+ before execution.
+ """
+ lines = []
+ for line in script.split('\n'):
+ line = line.strip()
+ if not line or line.startswith('--'):
+ continue
+ if line.endswith(';'):
+ line = line[:-1]
+ lines.append(line)
+ stmt = '\n'.join(lines)
+ self._run_script_stmt(cursor, stmt, params)
+ lines = []
+ else:
+ lines.append(line)
+ if lines:
+ stmt = '\n'.join(lines)
+ self._run_script_stmt(cursor, stmt, params)
+
+ def _run_many(self, cursor, stmt, items):
+ """Execute a statement repeatedly. Items should be a list of tuples.
+
+ stmt should use '%s' parameter format. Overridden by adapters
+ that use a different parameter format.
+ """
+ cursor.executemany(stmt, items)
+
+ def _open_and_call(self, callback):
+ """Call a function with an open connection and cursor.
+
+ If the function returns, commits the transaction and returns the
+ result returned by the function.
+ If the function raises an exception, aborts the transaction
+ then propagates the exception.
+ """
+ conn, cursor = self.open()
+ try:
+ try:
+ res = callback(conn, cursor)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ return res
+ finally:
+ self.close(conn, cursor)
+
+ def md5sum(self, data):
+ if data is not None:
+ return md5(data).hexdigest()
+ else:
+ # George Bailey object
+ return None
+
+ def iter_objects(self, cursor, tid):
+ """Iterate over object states in a transaction.
+
+ Yields (oid, prev_tid, state) for each object state.
+ """
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = %(tid)s
+ ORDER BY zoid
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ for oid, state in cursor:
+ if hasattr(state, 'read'):
+ # Oracle
+ state = state.read()
+ yield oid, state
+
+ def open_for_pre_pack(self):
+ """Open a connection to be used for the pre-pack phase.
+ Returns (conn, cursor).
+
+ Subclasses may override this.
+ """
+ return self.open()
+
+ def _hold_commit_lock(self, cursor):
+ """Hold the commit lock for packing"""
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
+ def _release_commit_lock(self, cursor):
+ """Release the commit lock during packing"""
+ # no action needed
+ pass
+
+
+ def fill_object_refs(self, conn, cursor, get_references):
+ """Update the object_refs table by analyzing new transactions."""
+ if self.keep_history:
+ stmt = """
+ SELECT transaction.tid
+ FROM transaction
+ LEFT JOIN object_refs_added
+ ON (transaction.tid = object_refs_added.tid)
+ WHERE object_refs_added.tid IS NULL
+ ORDER BY transaction.tid
+ """
+ else:
+ stmt = """
+ SELECT transaction.tid
+ FROM (SELECT DISTINCT tid FROM object_state) AS transaction
+ LEFT JOIN object_refs_added
+ ON (transaction.tid = object_refs_added.tid)
+ WHERE object_refs_added.tid IS NULL
+ ORDER BY transaction.tid
+ """
+
+ self._run_script_stmt(cursor, stmt)
+ tids = [tid for (tid,) in cursor]
+ if tids:
+ added = 0
+ log.info("discovering references from objects in %d "
+ "transaction(s)" % len(tids))
+ for tid in tids:
+ added += self._add_refs_for_tid(cursor, tid, get_references)
+ if added >= 10000:
+ # save the work done so far
+ conn.commit()
+ added = 0
+ if added:
+ conn.commit()
+
+ def _add_refs_for_tid(self, cursor, tid, get_references):
+ """Fill object_refs with all states for a transaction.
+
+ Returns the number of references added.
+ """
+ log.debug("pre_pack: transaction %d: computing references ", tid)
+ from_count = 0
+
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = %(tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+ add_rows = [] # [(from_oid, tid, to_oid)]
+ for from_oid, state in cursor:
+ if hasattr(state, 'read'):
+ # Oracle
+ state = state.read()
+ if state:
+ from_count += 1
+ try:
+ to_oids = get_references(str(state))
+ except:
+ log.error("pre_pack: can't unpickle "
+ "object %d in transaction %d; state length = %d" % (
+ from_oid, tid, len(state)))
+ raise
+ if self.keep_history:
+ for to_oid in to_oids:
+ add_rows.append((from_oid, tid, to_oid))
+ else:
+ for to_oid in to_oids:
+ add_rows.append((from_oid, to_oid))
+
+ if self.keep_history:
+ stmt = """
+ INSERT INTO object_ref (zoid, tid, to_zoid)
+ VALUES (%s, %s, %s)
+ """
+ self._run_many(cursor, stmt, add_rows)
+
+ else:
+ stmt = """
+ DELETE FROM object_ref
+ WHERE zoid in (
+ SELECT zoid
+ FROM object_state
+ WHERE tid = %(tid)s
+ )
+ """
+ self._run_script(cursor, stmt, {'tid': tid})
+
+ stmt = """
+ INSERT INTO object_ref (zoid, to_zoid)
+ VALUES (%s, %s)
+ """
+ self._run_many(cursor, stmt, add_rows)
+
+ # The references have been computed for this transaction.
+ stmt = """
+ INSERT INTO object_refs_added (tid)
+ VALUES (%(tid)s)
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+ to_count = len(add_rows)
+ log.debug("pre_pack: transaction %d: has %d reference(s) "
+ "from %d object(s)", tid, to_count, from_count)
+ return to_count
+
+
+ def _visit_all_references(self, cursor):
+ """Visit all references in pack_object and set the keep flags.
+ """
+ # Each of the objects to be kept might refer to other objects.
+ # Mark the referenced objects to be kept as well. Do this
+ # repeatedly until all references have been satisfied.
+ pass_num = 1
+ while True:
+ log.info("pre_pack: following references, pass %d", pass_num)
+
+ # Make a list of all parent objects that still need to be
+ # visited. Then set pack_object.visited for all pack_object
+ # rows with keep = true.
+ stmt = """
+ %(TRUNCATE)s temp_pack_visit;
+
+ INSERT INTO temp_pack_visit (zoid, keep_tid)
+ SELECT zoid, keep_tid
+ FROM pack_object
+ WHERE keep = %(TRUE)s
+ AND visited = %(FALSE)s;
+
+ UPDATE pack_object SET visited = %(TRUE)s
+ WHERE keep = %(TRUE)s
+ AND visited = %(FALSE)s
+ """
+ self._run_script(cursor, stmt)
+ visit_count = cursor.rowcount
+
+ if self.verify_sane_database:
+ # Verify the update actually worked.
+ # MySQL 5.1.23 fails this test; 5.1.24 passes.
+ stmt = """
+ SELECT 1
+ FROM pack_object
+ WHERE keep = %(TRUE)s AND visited = %(FALSE)s
+ """
+ self._run_script_stmt(cursor, stmt)
+ if list(cursor):
+ raise AssertionError(
+ "database failed to update pack_object")
+
+ log.debug("pre_pack: checking references from %d object(s)",
+ visit_count)
+
+ # Visit the children of all parent objects that were
+ # just visited.
+ stmt = self._scripts['pre_pack_follow_child_refs']
+ self._run_script(cursor, stmt)
+ found_count = cursor.rowcount
+
+ log.debug("pre_pack: found %d more referenced object(s) in "
+ "pass %d", found_count, pass_num)
+ if not found_count:
+ # No new references detected.
+ break
+ else:
+ pass_num += 1
+
+
+ def _pause_pack(self, sleep, options, start):
+ """Pause packing to allow concurrent commits."""
+ elapsed = time.time() - start
+ if elapsed == 0.0:
+ # Compensate for low timer resolution by
+ # assuming that at least 10 ms elapsed.
+ elapsed = 0.01
+ duty_cycle = options.pack_duty_cycle
+ if duty_cycle > 0.0 and duty_cycle < 1.0:
+ delay = min(options.pack_max_delay,
+ elapsed * (1.0 / duty_cycle - 1.0))
+ if delay > 0:
+ log.debug('pack: sleeping %.4g second(s)', delay)
+ sleep(delay)
+
+
+ def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid).
+ """
+ # find out the tid of the most recent transaction.
+ cursor.execute(self._poll_query)
+ new_polled_tid = cursor.fetchone()[0]
+
+ if prev_polled_tid is None:
+ # This is the first time the connection has polled.
+ return None, new_polled_tid
+
+ if new_polled_tid == prev_polled_tid:
+ # No transactions have been committed since prev_polled_tid.
+ return (), new_polled_tid
+
+ if self.keep_history:
+ stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
+ else:
+ stmt = "SELECT 1 FROM object_state WHERE tid <= %(tid)s LIMIT 1"
+ cursor.execute(intern(stmt % self._script_vars),
+ {'tid': prev_polled_tid})
+ rows = cursor.fetchall()
+ if not rows:
+ # Transaction not found; perhaps it has been packed.
+ # The connection cache needs to be cleared.
+ return None, new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ if ignore_tid is None:
+ stmt = """
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(tid)s
+ """
+ cursor.execute(intern(stmt % self._script_vars),
+ {'tid': prev_polled_tid})
+ else:
+ stmt = """
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(tid)s
+ AND tid != %(self_tid)s
+ """
+ cursor.execute(intern(stmt % self._script_vars),
+ {'tid': prev_polled_tid, 'self_tid': ignore_tid})
+ oids = [oid for (oid,) in cursor]
+
+ return oids, new_polled_tid
Deleted: relstorage/trunk/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py 2009-09-23 07:33:30 UTC (rev 104443)
+++ relstorage/trunk/relstorage/adapters/common.py 2009-09-23 07:38:44 UTC (rev 104444)
@@ -1,989 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Code common to most adapters."""
-
-from ZODB.POSException import UndoError
-
-import logging
-import time
-
-try:
- from hashlib import md5
-except ImportError:
- from md5 import new as md5
-
-
-log = logging.getLogger("relstorage.adapters.common")
-
-verify_sane_database = False
-
-
-# Notes about adapters:
-#
-# An adapter must not hold a connection, cursor, or database state, because
-# RelStorage opens multiple concurrent connections using a single adapter
-# instance.
-# Within the context of an adapter, all OID and TID values are integers,
-# not binary strings, except as noted.
-
-class Adapter(object):
- """Common code for a database adapter.
-
- This is an abstract class; a lot of methods are expected to be
- provided by subclasses.
- """
-
- # _script_vars contains replacements for statements in scripts.
- # These are correct for PostgreSQL and MySQL but not for Oracle.
- _script_vars = {
- 'TRUE': 'TRUE',
- 'FALSE': 'FALSE',
- 'OCTET_LENGTH': 'OCTET_LENGTH',
- 'TRUNCATE': 'TRUNCATE',
- 'oid': '%(oid)s',
- 'tid': '%(tid)s',
- 'pack_tid': '%(pack_tid)s',
- 'undo_tid': '%(undo_tid)s',
- 'self_tid': '%(self_tid)s',
- 'min_tid': '%(min_tid)s',
- 'max_tid': '%(max_tid)s',
- }
-
- _scripts = {
- 'choose_pack_transaction': """
- SELECT tid
- FROM transaction
- WHERE tid > 0
- AND tid <= %(tid)s
- AND packed = FALSE
- ORDER BY tid DESC
- LIMIT 1
- """,
-
- 'create_temp_pack_visit': """
- CREATE TEMPORARY TABLE temp_pack_visit (
- zoid BIGINT NOT NULL,
- keep_tid BIGINT
- );
- CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
- """,
-
- 'create_temp_undo': """
- CREATE TEMPORARY TABLE temp_undo (
- zoid BIGINT NOT NULL,
- prev_tid BIGINT NOT NULL
- );
- CREATE UNIQUE INDEX temp_undo_zoid ON temp_undo (zoid)
- """,
-
- 'reset_temp_undo': "DROP TABLE temp_undo",
-
- 'transaction_has_data': """
- SELECT tid
- FROM object_state
- WHERE tid = %(tid)s
- LIMIT 1
- """,
-
- 'prepack_follow_child_refs': """
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE keep = %(FALSE)s
- AND zoid IN (
- SELECT DISTINCT to_zoid
- FROM object_ref
- JOIN temp_pack_visit USING (zoid)
- WHERE object_ref.tid >= temp_pack_visit.keep_tid
- )
- """,
-
- 'pack_current_object': """
- DELETE FROM current_object
- WHERE tid = %(tid)s
- AND zoid in (
- SELECT pack_state.zoid
- FROM pack_state
- WHERE pack_state.tid = %(tid)s
- )
- """,
-
- 'pack_object_state': """
- DELETE FROM object_state
- WHERE tid = %(tid)s
- AND zoid in (
- SELECT pack_state.zoid
- FROM pack_state
- WHERE pack_state.tid = %(tid)s
- )
- """,
-
- 'pack_object_ref': """
- DELETE FROM object_refs_added
- WHERE tid IN (
- SELECT tid
- FROM transaction
- WHERE empty = %(TRUE)s
- );
- DELETE FROM object_ref
- WHERE tid IN (
- SELECT tid
- FROM transaction
- WHERE empty = %(TRUE)s
- )
- """,
- }
-
- def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
- """Execute a statement from a script with the given parameters.
-
- Subclasses may override this.
- The input statement is generic and needs to be transformed
- into a database-specific statement.
- """
- stmt = generic_stmt % self._script_vars
- try:
- cursor.execute(stmt, generic_params)
- except:
- log.warning("script statement failed: %r; parameters: %r",
- stmt, generic_params)
- raise
-
-
- def _run_script(self, cursor, script, params=()):
- """Execute a series of statements in the database.
-
- The statements are transformed by _run_script_stmt
- before execution.
- """
- lines = []
- for line in script.split('\n'):
- line = line.strip()
- if not line or line.startswith('--'):
- continue
- if line.endswith(';'):
- line = line[:-1]
- lines.append(line)
- stmt = '\n'.join(lines)
- self._run_script_stmt(cursor, stmt, params)
- lines = []
- else:
- lines.append(line)
- if lines:
- stmt = '\n'.join(lines)
- self._run_script_stmt(cursor, stmt, params)
-
- def _open_and_call(self, callback):
- """Call a function with an open connection and cursor.
-
- If the function returns, commits the transaction and returns the
- result returned by the function.
- If the function raises an exception, aborts the transaction
- then propagates the exception.
- """
- conn, cursor = self.open()
- try:
- try:
- res = callback(conn, cursor)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- return res
- finally:
- self.close(conn, cursor)
-
- def _transaction_iterator(self, cursor):
- """Iterate over a list of transactions returned from the database.
-
- Each row begins with (tid, username, description, extension)
- and may have other columns.
- """
- for row in cursor:
- tid, username, description, ext = row[:4]
- if username is None:
- username = ''
- else:
- username = str(username)
- if description is None:
- description = ''
- else:
- description = str(description)
- if ext is None:
- ext = ''
- else:
- ext = str(ext)
- yield (tid, username, description, ext) + tuple(row[4:])
-
-
- def iter_transactions(self, cursor):
- """Iterate over the transaction log, newest first.
-
- Skips packed transactions.
- Yields (tid, username, description, extension) for each transaction.
- """
- stmt = """
- SELECT tid, username, description, extension
- FROM transaction
- WHERE packed = %(FALSE)s
- AND tid != 0
- ORDER BY tid DESC
- """
- self._run_script_stmt(cursor, stmt)
- return self._transaction_iterator(cursor)
-
-
- def iter_transactions_range(self, cursor, start=None, stop=None):
- """Iterate over the transactions in the given range, oldest first.
-
- Includes packed transactions.
- Yields (tid, username, description, extension, packed)
- for each transaction.
- """
- stmt = """
- SELECT tid, username, description, extension,
- CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END
- FROM transaction
- WHERE tid >= 0
- """
- if start is not None:
- stmt += " AND tid >= %(min_tid)s"
- if stop is not None:
- stmt += " AND tid <= %(max_tid)s"
- stmt += " ORDER BY tid"
- self._run_script_stmt(cursor, stmt,
- {'min_tid': start, 'max_tid': stop})
- return self._transaction_iterator(cursor)
-
-
- def iter_object_history(self, cursor, oid):
- """Iterate over an object's history.
-
- Raises KeyError if the object does not exist.
- Yields (tid, username, description, extension, pickle_size)
- for each modification.
- """
- stmt = """
- SELECT 1 FROM current_object WHERE zoid = %(oid)s
- """
- self._run_script_stmt(cursor, stmt, {'oid': oid})
- if not cursor.fetchall():
- raise KeyError(oid)
-
- stmt = """
- SELECT tid, username, description, extension, %(OCTET_LENGTH)s(state)
- FROM transaction
- JOIN object_state USING (tid)
- WHERE zoid = %(oid)s
- AND packed = %(FALSE)s
- ORDER BY tid DESC
- """
- self._run_script_stmt(cursor, stmt, {'oid': oid})
- return self._transaction_iterator(cursor)
-
-
- def iter_objects(self, cursor, tid):
- """Iterate over object states in a transaction.
-
- Yields (oid, prev_tid, state) for each object state.
- """
- stmt = """
- SELECT zoid, state
- FROM object_state
- WHERE tid = %(tid)s
- ORDER BY zoid
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- for oid, state in cursor:
- if hasattr(state, 'read'):
- # Oracle
- state = state.read()
- yield oid, state
-
-
- def verify_undoable(self, cursor, undo_tid):
- """Raise UndoError if it is not safe to undo the specified txn."""
- stmt = """
- SELECT 1 FROM transaction
- WHERE tid = %(undo_tid)s
- AND packed = %(FALSE)s
- """
- self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
- if not cursor.fetchall():
- raise UndoError("Transaction not found or packed")
-
- # Rule: we can undo an object if the object's state in the
- # transaction to undo matches the object's current state.
- # If any object in the transaction does not fit that rule,
- # refuse to undo.
- stmt = """
- SELECT prev_os.zoid, current_object.tid
- FROM object_state prev_os
- JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
- JOIN current_object ON (cur_os.zoid = current_object.zoid
- AND cur_os.tid = current_object.tid)
- WHERE prev_os.tid = %(undo_tid)s
- AND cur_os.md5 != prev_os.md5
- """
- self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
- if cursor.fetchmany():
- raise UndoError(
- "Some data were modified by a later transaction")
-
- # Rule: don't allow the creation of the root object to
- # be undone. It's hard to get it back.
- stmt = """
- SELECT 1
- FROM object_state
- WHERE tid = %(undo_tid)s
- AND zoid = 0
- AND prev_tid = 0
- """
- self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
- if cursor.fetchall():
- raise UndoError("Can't undo the creation of the root object")
-
-
- def undo(self, cursor, undo_tid, self_tid):
- """Undo a transaction.
-
- Parameters: "undo_tid", the integer tid of the transaction to undo,
- and "self_tid", the integer tid of the current transaction.
-
- Returns the states copied forward by the undo operation as a
- list of (oid, old_tid).
- """
- stmt = self._scripts['create_temp_undo']
- if stmt:
- self._run_script(cursor, stmt)
-
- stmt = """
- DELETE FROM temp_undo;
-
- -- Put into temp_undo the list of objects to be undone and
- -- the tid of the transaction that has the undone state.
- INSERT INTO temp_undo (zoid, prev_tid)
- SELECT zoid, prev_tid
- FROM object_state
- WHERE tid = %(undo_tid)s;
-
- -- Override previous undo operations within this transaction
- -- by resetting the current_object pointer and deleting
- -- copied states from object_state.
- UPDATE current_object
- SET tid = (
- SELECT prev_tid
- FROM object_state
- WHERE zoid = current_object.zoid
- AND tid = %(self_tid)s
- )
- WHERE zoid IN (SELECT zoid FROM temp_undo)
- AND tid = %(self_tid)s;
-
- DELETE FROM object_state
- WHERE zoid IN (SELECT zoid FROM temp_undo)
- AND tid = %(self_tid)s;
-
- -- Copy old states forward.
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
- prev.md5, prev.state
- FROM temp_undo
- JOIN current_object ON (temp_undo.zoid = current_object.zoid)
- LEFT JOIN object_state prev
- ON (prev.zoid = temp_undo.zoid
- AND prev.tid = temp_undo.prev_tid);
-
- -- List the copied states.
- SELECT zoid, prev_tid FROM temp_undo
- """
- self._run_script(cursor, stmt,
- {'undo_tid': undo_tid, 'self_tid': self_tid})
- res = list(cursor)
-
- stmt = self._scripts['reset_temp_undo']
- if stmt:
- self._run_script(cursor, stmt)
-
- return res
-
-
- def choose_pack_transaction(self, pack_point):
- """Return the transaction before or at the specified pack time.
-
- Returns None if there is nothing to pack.
- """
- conn, cursor = self.open()
- try:
- stmt = self._scripts['choose_pack_transaction']
- self._run_script(cursor, stmt, {'tid': pack_point})
- rows = cursor.fetchall()
- if not rows:
- # Nothing needs to be packed.
- return None
- return rows[0][0]
- finally:
- self.close(conn, cursor)
-
-
- def open_for_pre_pack(self):
- """Open a connection to be used for the pre-pack phase.
- Returns (conn, cursor).
-
- Subclasses may override this.
- """
- return self.open()
-
-
- def pre_pack(self, pack_tid, get_references, options):
- """Decide what to pack.
-
- tid specifies the most recent transaction to pack.
-
- get_references is a function that accepts a pickled state and
- returns a set of OIDs that state refers to.
-
- options is an instance of relstorage.Options.
- The options.pack_gc flag indicates whether to run garbage collection.
- If pack_gc is false, at least one revision of every object is kept,
- even if nothing refers to it. Packing with pack_gc disabled can be
- much faster.
- """
- conn, cursor = self.open_for_pre_pack()
- try:
- try:
- if options.pack_gc:
- log.info("pre_pack: start with gc enabled")
- self._pre_pack_with_gc(
- conn, cursor, pack_tid, get_references)
- else:
- log.info("pre_pack: start without gc")
- self._pre_pack_without_gc(
- conn, cursor, pack_tid)
- conn.commit()
-
- log.info("pre_pack: enumerating states to pack")
- stmt = "%(TRUNCATE)s pack_state"
- self._run_script_stmt(cursor, stmt)
- to_remove = 0
-
- if options.pack_gc:
- # Pack objects with the keep flag set to false.
- stmt = """
- INSERT INTO pack_state (tid, zoid)
- SELECT tid, zoid
- FROM object_state
- JOIN pack_object USING (zoid)
- WHERE keep = %(FALSE)s
- AND tid > 0
- AND tid <= %(pack_tid)s
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
- to_remove += cursor.rowcount
-
- # Pack object states with the keep flag set to true.
- stmt = """
- INSERT INTO pack_state (tid, zoid)
- SELECT tid, zoid
- FROM object_state
- JOIN pack_object USING (zoid)
- WHERE keep = %(TRUE)s
- AND tid > 0
- AND tid != keep_tid
- AND tid <= %(pack_tid)s
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid':pack_tid})
- to_remove += cursor.rowcount
-
- log.info("pre_pack: enumerating transactions to pack")
- stmt = "%(TRUNCATE)s pack_state_tid"
- self._run_script_stmt(cursor, stmt)
- stmt = """
- INSERT INTO pack_state_tid (tid)
- SELECT DISTINCT tid
- FROM pack_state
- """
- cursor.execute(stmt)
-
- log.info("pre_pack: will remove %d object state(s)",
- to_remove)
-
- except:
- log.exception("pre_pack: failed")
- conn.rollback()
- raise
- else:
- log.info("pre_pack: finished successfully")
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def _pre_pack_without_gc(self, conn, cursor, pack_tid):
- """Determine what to pack, without garbage collection.
-
- With garbage collection disabled, there is no need to follow
- object references.
- """
- # Fill the pack_object table with OIDs, but configure them
- # all to be kept by setting keep to true.
- log.debug("pre_pack: populating pack_object")
- stmt = """
- %(TRUNCATE)s pack_object;
-
- INSERT INTO pack_object (zoid, keep, keep_tid)
- SELECT zoid, %(TRUE)s, MAX(tid)
- FROM object_state
- WHERE tid > 0 AND tid <= %(pack_tid)s
- GROUP BY zoid
- """
- self._run_script(cursor, stmt, {'pack_tid': pack_tid})
-
-
- def _pre_pack_with_gc(self, conn, cursor, pack_tid, get_references):
- """Determine what to pack, with garbage collection.
- """
- stmt = self._scripts['create_temp_pack_visit']
- if stmt:
- self._run_script(cursor, stmt)
-
- self.fill_object_refs(conn, cursor, get_references)
-
- log.info("pre_pack: filling the pack_object table")
- # Fill the pack_object table with OIDs that either will be
- # removed (if nothing references the OID) or whose history will
- # be cut.
- stmt = """
- %(TRUNCATE)s pack_object;
-
- INSERT INTO pack_object (zoid, keep, keep_tid)
- SELECT zoid, %(FALSE)s, MAX(tid)
- FROM object_state
- WHERE tid > 0 AND tid <= %(pack_tid)s
- GROUP BY zoid;
-
- -- If the root object is in pack_object, keep it.
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE zoid = 0;
-
- -- Keep objects that have been revised since pack_tid.
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE zoid IN (
- SELECT zoid
- FROM current_object
- WHERE tid > %(pack_tid)s
- );
-
- -- Keep objects that are still referenced by object states in
- -- transactions that will not be packed.
- -- Use temp_pack_visit for temporary state; otherwise MySQL 5 chokes.
- INSERT INTO temp_pack_visit (zoid)
- SELECT DISTINCT to_zoid
- FROM object_ref
- WHERE tid > %(pack_tid)s;
-
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE zoid IN (
- SELECT zoid
- FROM temp_pack_visit
- );
-
- %(TRUNCATE)s temp_pack_visit;
- """
- self._run_script(cursor, stmt, {'pack_tid': pack_tid})
-
- # Each of the packable objects to be kept might
- # refer to other objects. If some of those references
- # include objects currently set to be removed, mark
- # the referenced objects to be kept as well. Do this
- # repeatedly until all references have been satisfied.
- pass_num = 1
- while True:
- log.info("pre_pack: following references, pass %d", pass_num)
-
- # Make a list of all parent objects that still need
- # to be visited. Then set pack_object.visited for all pack_object
- # rows with keep = true.
- stmt = """
- %(TRUNCATE)s temp_pack_visit;
-
- INSERT INTO temp_pack_visit (zoid, keep_tid)
- SELECT zoid, keep_tid
- FROM pack_object
- WHERE keep = %(TRUE)s
- AND visited = %(FALSE)s;
-
- UPDATE pack_object SET visited = %(TRUE)s
- WHERE keep = %(TRUE)s
- AND visited = %(FALSE)s
- """
- self._run_script(cursor, stmt)
- visit_count = cursor.rowcount
-
- if verify_sane_database:
- # Verify the update actually worked.
- # MySQL 5.1.23 fails this test; 5.1.24 passes.
- stmt = """
- SELECT 1
- FROM pack_object
- WHERE keep = %(TRUE)s AND visited = %(FALSE)s
- """
- self._run_script_stmt(cursor, stmt)
- if list(cursor):
- raise AssertionError(
- "database failed to update pack_object")
-
- log.debug("pre_pack: checking references from %d object(s)",
- visit_count)
-
- # Visit the children of all parent objects that were
- # just visited.
- stmt = self._scripts['prepack_follow_child_refs']
- self._run_script(cursor, stmt)
- found_count = cursor.rowcount
-
- log.debug("pre_pack: found %d more referenced object(s) in "
- "pass %d", found_count, pass_num)
- if not found_count:
- # No new references detected.
- break
- else:
- pass_num += 1
-
-
- def _add_object_ref_rows(self, cursor, add_rows):
- """Add rows to object_ref.
-
- The input rows are tuples containing (from_zoid, tid, to_zoid).
-
- Subclasses can override this.
- """
- stmt = """
- INSERT INTO object_ref (zoid, tid, to_zoid)
- VALUES (%s, %s, %s)
- """
- cursor.executemany(stmt, add_rows)
-
-
- def _add_refs_for_tid(self, cursor, tid, get_references):
- """Fill object_refs with all states for a transaction.
-
- Returns the number of references added.
- """
- log.debug("pre_pack: transaction %d: computing references ", tid)
- from_count = 0
-
- stmt = """
- SELECT zoid, state
- FROM object_state
- WHERE tid = %(tid)s
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
-
- add_rows = [] # [(from_oid, tid, to_oid)]
- for from_oid, state in cursor:
- if hasattr(state, 'read'):
- # Oracle
- state = state.read()
- if state:
- from_count += 1
- try:
- to_oids = get_references(str(state))
- except:
- log.error("pre_pack: can't unpickle "
- "object %d in transaction %d; state length = %d" % (
- from_oid, tid, len(state)))
- raise
- for to_oid in to_oids:
- add_rows.append((from_oid, tid, to_oid))
-
- if add_rows:
- self._add_object_ref_rows(cursor, add_rows)
-
- # The references have been computed for this transaction.
- stmt = """
- INSERT INTO object_refs_added (tid)
- VALUES (%(tid)s)
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
-
- to_count = len(add_rows)
- log.debug("pre_pack: transaction %d: has %d reference(s) "
- "from %d object(s)", tid, to_count, from_count)
- return to_count
-
-
- def fill_object_refs(self, conn, cursor, get_references):
- """Update the object_refs table by analyzing new transactions."""
- stmt = """
- SELECT transaction.tid
- FROM transaction
- LEFT JOIN object_refs_added
- ON (transaction.tid = object_refs_added.tid)
- WHERE object_refs_added.tid IS NULL
- ORDER BY transaction.tid
- """
- cursor.execute(stmt)
- tids = [tid for (tid,) in cursor]
- if tids:
- added = 0
- log.info("discovering references from objects in %d "
- "transaction(s)" % len(tids))
- for tid in tids:
- added += self._add_refs_for_tid(cursor, tid, get_references)
- if added >= 10000:
- # save the work done so far
- conn.commit()
- added = 0
- if added:
- conn.commit()
-
-
- def _hold_commit_lock(self, cursor):
- """Hold the commit lock for packing"""
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
-
-
- def _release_commit_lock(self, cursor):
- """Release the commit lock during packing"""
- # no action needed
- pass
-
-
- def pack(self, pack_tid, options, sleep=time.sleep, packed_func=None):
- """Pack. Requires the information provided by pre_pack."""
-
- # Read committed mode is sufficient.
- conn, cursor = self.open()
- try:
- try:
- stmt = """
- SELECT transaction.tid,
- CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END,
- CASE WHEN pack_state_tid.tid IS NOT NULL THEN 1 ELSE 0 END
- FROM transaction
- LEFT JOIN pack_state_tid ON (
- transaction.tid = pack_state_tid.tid)
- WHERE transaction.tid > 0
- AND transaction.tid <= %(pack_tid)s
- AND (packed = %(FALSE)s OR pack_state_tid.tid IS NOT NULL)
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
- tid_rows = list(cursor)
- tid_rows.sort() # oldest first
-
- log.info("pack: will pack %d transaction(s)", len(tid_rows))
-
- stmt = self._scripts['create_temp_pack_visit']
- if stmt:
- self._run_script(cursor, stmt)
-
- # Hold the commit lock while packing to prevent deadlocks.
- # Pack in small batches of transactions in order to minimize
- # the interruption of concurrent write operations.
- start = time.time()
- packed_list = []
- self._hold_commit_lock(cursor)
- for tid, packed, has_removable in tid_rows:
- self._pack_transaction(
- cursor, pack_tid, tid, packed, has_removable,
- packed_list)
- if time.time() >= start + options.pack_batch_timeout:
- conn.commit()
- if packed_func is not None:
- for oid, tid in packed_list:
- packed_func(oid, tid)
- del packed_list[:]
- self._release_commit_lock(cursor)
- self._pause_pack(sleep, options, start)
- self._hold_commit_lock(cursor)
- start = time.time()
- if packed_func is not None:
- for oid, tid in packed_list:
- packed_func(oid, tid)
- packed_list = None
-
- self._pack_cleanup(conn, cursor)
-
- except:
- log.exception("pack: failed")
- conn.rollback()
- raise
-
- else:
- log.info("pack: finished successfully")
- conn.commit()
-
- finally:
- self.close(conn, cursor)
-
- def _pause_pack(self, sleep, options, start):
- """Pause packing to allow concurrent commits."""
- elapsed = time.time() - start
- if elapsed == 0.0:
- # Compensate for low timer resolution by
- # assuming that at least 10 ms elapsed.
- elapsed = 0.01
- duty_cycle = options.pack_duty_cycle
- if duty_cycle > 0.0 and duty_cycle < 1.0:
- delay = min(options.pack_max_delay,
- elapsed * (1.0 / duty_cycle - 1.0))
- if delay > 0:
- log.debug('pack: sleeping %.4g second(s)', delay)
- sleep(delay)
-
- def _pack_transaction(self, cursor, pack_tid, tid, packed,
- has_removable, packed_list):
- """Pack one transaction. Requires populated pack tables."""
- log.debug("pack: transaction %d: packing", tid)
- removed_objects = 0
- removed_states = 0
-
- if has_removable:
- stmt = self._scripts['pack_current_object']
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- removed_objects = cursor.rowcount
-
- stmt = self._scripts['pack_object_state']
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- removed_states = cursor.rowcount
-
- # Terminate prev_tid chains
- stmt = """
- UPDATE object_state SET prev_tid = 0
- WHERE prev_tid = %(tid)s
- AND tid <= %(pack_tid)s
- """
- self._run_script_stmt(cursor, stmt,
- {'pack_tid': pack_tid, 'tid': tid})
-
- stmt = """
- SELECT pack_state.zoid
- FROM pack_state
- WHERE pack_state.tid = %(tid)s
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- for (oid,) in cursor:
- packed_list.append((oid, tid))
-
- # Find out whether the transaction is empty
- stmt = self._scripts['transaction_has_data']
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- empty = not list(cursor)
-
- # mark the transaction packed and possibly empty
- if empty:
- clause = 'empty = %(TRUE)s'
- state = 'empty'
- else:
- clause = 'empty = %(FALSE)s'
- state = 'not empty'
- stmt = "UPDATE transaction SET packed = %(TRUE)s, " + clause
- stmt += " WHERE tid = %(tid)s"
- self._run_script_stmt(cursor, stmt, {'tid': tid})
-
- log.debug(
- "pack: transaction %d (%s): removed %d object(s) and %d state(s)",
- tid, state, removed_objects, removed_states)
-
-
- def _pack_cleanup(self, conn, cursor):
- """Remove unneeded table rows after packing"""
- # commit the work done so far
- conn.commit()
- self._release_commit_lock(cursor)
- self._hold_commit_lock(cursor)
- log.info("pack: cleaning up")
-
- log.debug("pack: removing unused object references")
- stmt = self._scripts['pack_object_ref']
- self._run_script(cursor, stmt)
-
- log.debug("pack: removing empty packed transactions")
- stmt = """
- DELETE FROM transaction
- WHERE packed = %(TRUE)s
- AND empty = %(TRUE)s
- """
- self._run_script_stmt(cursor, stmt)
-
- # perform cleanup that does not require the commit lock
- conn.commit()
- self._release_commit_lock(cursor)
-
- log.debug("pack: clearing temporary pack state")
- for _table in ('pack_object', 'pack_state', 'pack_state_tid'):
- stmt = '%(TRUNCATE)s ' + _table
- self._run_script_stmt(cursor, stmt)
-
-
- def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
- """Polls for new transactions.
-
- conn and cursor must have been created previously by open_for_load().
- prev_polled_tid is the tid returned at the last poll, or None
- if this is the first poll. If ignore_tid is not None, changes
- committed in that transaction will not be included in the list
- of changed OIDs.
-
- Returns (changed_oids, new_polled_tid).
- """
- # find out the tid of the most recent transaction.
- cursor.execute(self._poll_query)
- new_polled_tid = cursor.fetchone()[0]
-
- if prev_polled_tid is None:
- # This is the first time the connection has polled.
- return None, new_polled_tid
-
- if new_polled_tid == prev_polled_tid:
- # No transactions have been committed since prev_polled_tid.
- return (), new_polled_tid
-
- stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
- cursor.execute(intern(stmt % self._script_vars),
- {'tid': prev_polled_tid})
- rows = cursor.fetchall()
- if not rows:
- # Transaction not found; perhaps it has been packed.
- # The connection cache needs to be cleared.
- return None, new_polled_tid
-
- # Get the list of changed OIDs and return it.
- if ignore_tid is None:
- stmt = """
- SELECT zoid
- FROM current_object
- WHERE tid > %(tid)s
- """
- cursor.execute(intern(stmt % self._script_vars),
- {'tid': prev_polled_tid})
- else:
- stmt = """
- SELECT zoid
- FROM current_object
- WHERE tid > %(tid)s
- AND tid != %(self_tid)s
- """
- cursor.execute(intern(stmt % self._script_vars),
- {'tid': prev_polled_tid, 'self_tid': ignore_tid})
- oids = [oid for (oid,) in cursor]
-
- return oids, new_polled_tid
-
- def md5sum(self, data):
- if data is not None:
- return md5(data).hexdigest()
- else:
- # George Bailey object
- return None
Added: relstorage/trunk/relstorage/adapters/historyfree.py
===================================================================
--- relstorage/trunk/relstorage/adapters/historyfree.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/historyfree.py 2009-09-23 07:38:44 UTC (rev 104444)
@@ -0,0 +1,310 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Code common to history-free adapters."""
+
+
+import logging
+import time
+from relstorage.adapters.abstract import AbstractAdapter
+from ZODB.POSException import UndoError
+
+log = logging.getLogger(__name__)
+
+
+class HistoryFreeAdapter(AbstractAdapter):
+ """An abstract adapter that does not retain history.
+
+ Derivatives should have at least the following schema::
+
+ -- All object states in all transactions.
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL CHECK (tid > 0),
+ state BYTEA
+ );
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during garbage collection.
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL,
+ PRIMARY KEY (zoid, to_zoid)
+ );
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during garbage collection:
+ -- The list of all objects, a flag signifying whether
+ -- the object should be kept, and a flag signifying whether
+ -- the object's references have been visited.
+ -- The keep_tid field specifies the current revision of the object.
+ CREATE TABLE pack_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL DEFAULT FALSE,
+ keep_tid BIGINT NOT NULL,
+ visited BOOLEAN NOT NULL DEFAULT FALSE
+ );
+ """
+
+ keep_history = False
+
+ _scripts = {
+ 'create_temp_pack_visit': """
+ CREATE TEMPORARY TABLE temp_pack_visit (
+ zoid BIGINT NOT NULL
+ );
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
+ """,
+
+ 'pre_pack_follow_child_refs': """
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN temp_pack_visit USING (zoid)
+ )
+ """,
+ }
+
+ def iter_transactions(self, cursor):
+ """Iterate over the transaction log, newest first.
+
+ Skips packed transactions.
+ Yields (tid, username, description, extension) for each transaction.
+ """
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ ORDER BY tid DESC
+ """
+ self._run_script_stmt(cursor, stmt)
+ return ((tid, '', '', '') for (tid,) in cursor)
+
+ def iter_transactions_range(self, cursor, start=None, stop=None):
+ """Iterate over the transactions in the given range, oldest first.
+
+ Includes packed transactions.
+ Yields (tid, username, description, extension, packed)
+ for each transaction.
+ """
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ WHERE tid > 0
+ """
+ if start is not None:
+ stmt += " AND tid >= %(min_tid)s"
+ if stop is not None:
+ stmt += " AND tid <= %(max_tid)s"
+ stmt += " ORDER BY tid"
+ self._run_script_stmt(cursor, stmt,
+ {'min_tid': start, 'max_tid': stop})
+ return ((tid, '', '', '', True) for (tid,) in cursor)
+
+ def iter_object_history(self, cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+ stmt = """
+ SELECT tid, %(OCTET_LENGTH)s(state)
+ FROM object_state
+ WHERE zoid = %(oid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'oid': oid})
+ return ((tid, '', '', '', size) for (tid, size) in cursor)
+
+ def verify_undoable(self, cursor, undo_tid):
+ """Raise UndoError if it is not safe to undo the specified txn."""
+ raise UndoError("Undo is not supported by this storage")
+
+ def undo(self, cursor, undo_tid, self_tid):
+ """Undo a transaction.
+
+ Parameters: "undo_tid", the integer tid of the transaction to undo,
+ and "self_tid", the integer tid of the current transaction.
+
+ Returns the list of OIDs undone.
+ """
+ raise UndoError("Undo is not supported by this storage")
+
+ def choose_pack_transaction(self, pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+ return 1
+
+
+ def pre_pack(self, pack_tid, get_references, options):
+ """Decide what the garbage collector should delete.
+
+ pack_tid is ignored.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+
+ options is an instance of relstorage.Options.
+ The options.pack_gc flag indicates whether to run garbage collection.
+ If pack_gc is false, this method does nothing.
+ """
+ if not options.pack_gc:
+ log.warning("pre_pack: garbage collection is disabled on a "
+ "history-free storage, so doing nothing")
+ return
+
+ conn, cursor = self.open_for_pre_pack()
+ try:
+ try:
+ self._pre_pack_main(conn, cursor, get_references)
+ except:
+ log.exception("pre_pack: failed")
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ log.info("pre_pack: finished successfully")
+ finally:
+ self.close(conn, cursor)
+
+
+ def _pre_pack_main(self, conn, cursor, get_references):
+ """Determine what to garbage collect.
+ """
+ stmt = self._scripts['create_temp_pack_visit']
+ if stmt:
+ self._run_script(cursor, stmt)
+
+ self.fill_object_refs(conn, cursor, get_references)
+
+ log.info("pre_pack: filling the pack_object table")
+ # Fill the pack_object table with all known OIDs.
+ stmt = """
+ %(TRUNCATE)s pack_object;
+
+ INSERT INTO pack_object (zoid, keep_tid)
+ SELECT zoid, tid
+ FROM object_state;
+
+ -- Keep the root object
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE zoid = 0;
+ """
+ self._run_script(cursor, stmt)
+
+ # Set the 'keep' flags in pack_object
+ self._visit_all_references(cursor)
+
+
+ def pack(self, pack_tid, options, sleep=time.sleep, packed_func=None):
+ """Run garbage collection.
+
+ Requires the information provided by _pre_gc.
+ """
+
+ # Read committed mode is sufficient.
+ conn, cursor = self.open()
+ try:
+ try:
+ stmt = """
+ SELECT zoid, keep_tid
+ FROM pack_object
+ WHERE keep = %(FALSE)s
+ """
+ self._run_script_stmt(cursor, stmt)
+ to_remove = list(cursor)
+
+ log.info("pack: will remove %d object(s)", len(to_remove))
+
+ # Hold the commit lock while packing to prevent deadlocks.
+ # Pack in small batches of transactions in order to minimize
+ # the interruption of concurrent write operations.
+ start = time.time()
+ packed_list = []
+ self._hold_commit_lock(cursor)
+
+ for item in to_remove:
+ oid, tid = item
+ stmt = """
+ DELETE FROM object_state
+ WHERE zoid = %(oid)s
+ AND tid = %(tid)s
+ """
+ self._run_script_stmt(
+ cursor, stmt, {'oid': oid, 'tid': tid})
+ packed_list.append(item)
+
+ if time.time() >= start + options.pack_batch_timeout:
+ conn.commit()
+ if packed_func is not None:
+ for oid, tid in packed_list:
+ packed_func(oid, tid)
+ del packed_list[:]
+ self._release_commit_lock(cursor)
+ self._pause_pack(sleep, options, start)
+ self._hold_commit_lock(cursor)
+ start = time.time()
+
+ if packed_func is not None:
+ for oid, tid in packed_list:
+ packed_func(oid, tid)
+ packed_list = None
+
+ self._pack_cleanup(conn, cursor)
+
+ except:
+ log.exception("pack: failed")
+ conn.rollback()
+ raise
+
+ else:
+ log.info("pack: finished successfully")
+ conn.commit()
+
+ finally:
+ self.close(conn, cursor)
+
+
+ def _pack_cleaup(self, conn, cursor):
+ # commit the work done so far
+ conn.commit()
+ self._release_commit_lock(cursor)
+ self._hold_commit_lock(cursor)
+ log.info("pack: cleaning up")
+
+ stmt = """
+ DELETE FROM object_refs_added
+ WHERE tid NOT IN (
+ SELECT DISTINCT tid
+ FROM object_state
+ );
+
+ DELETE FROM object_ref
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %(FALSE)s
+ );
+
+ %(TRUNCATE)s pack_object
+ """
+ self._run_script(cursor, stmt)
Added: relstorage/trunk/relstorage/adapters/historypreserving.py
===================================================================
--- relstorage/trunk/relstorage/adapters/historypreserving.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/historypreserving.py 2009-09-23 07:38:44 UTC (rev 104444)
@@ -0,0 +1,724 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Code common to history-preserving adapters."""
+
+
+import logging
+import time
+from relstorage.adapters.abstract import AbstractAdapter
+from ZODB.POSException import UndoError
+
+log = logging.getLogger(__name__)
+
+
+class HistoryPreservingAdapter(AbstractAdapter):
+ """An abstract adapter that retains history.
+
+ Derivatives should have at least the following schema::
+
+ -- The list of all transactions in the database
+ CREATE TABLE transaction (
+ tid BIGINT NOT NULL PRIMARY KEY,
+ packed BOOLEAN NOT NULL DEFAULT FALSE,
+ empty BOOLEAN NOT NULL DEFAULT FALSE,
+ username BYTEA NOT NULL,
+ description BYTEA NOT NULL,
+ extension BYTEA
+ );
+
+ -- All object states in all transactions. Note that md5 and state
+ -- can be null to represent object uncreation.
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL REFERENCES transaction
+ CHECK (tid > 0),
+ PRIMARY KEY (zoid, tid),
+ prev_tid BIGINT NOT NULL REFERENCES transaction,
+ md5 CHAR(32),
+ state BYTEA
+ );
+
+ -- Pointers to the current object state
+ CREATE TABLE current_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL,
+ FOREIGN KEY (zoid, tid) REFERENCES object_state
+ );
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during packing.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because rows in object_state
+ -- are never modified once committed, and rows are removed
+ -- from object_state only by packing.
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL,
+ PRIMARY KEY (tid, zoid, to_zoid)
+ );
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because object states
+ -- are never added to a transaction once committed, and
+ -- rows are removed from the transaction table only by
+ -- packing.
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during packing:
+ -- The list of objects to pack. If keep is false,
+ -- the object and all its revisions will be removed.
+ -- If keep is true, instead of removing the object,
+ -- the pack operation will cut the object's history.
+ -- The keep_tid field specifies the oldest revision
+ -- of the object to keep.
+ -- The visited flag is set when pre_pack is visiting an object's
+ -- references, and remains set.
+ CREATE TABLE pack_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL,
+ keep_tid BIGINT NOT NULL,
+ visited BOOLEAN NOT NULL DEFAULT FALSE
+ );
+
+ -- Temporary state during packing: the list of object states to pack.
+ CREATE TABLE pack_state (
+ tid BIGINT NOT NULL,
+ zoid BIGINT NOT NULL,
+ PRIMARY KEY (tid, zoid)
+ );
+
+ -- Temporary state during packing: the list of transactions that
+ -- have at least one object state to pack.
+ CREATE TABLE pack_state_tid (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
+ """
+
+ keep_history = True
+
+ _scripts = {
+ 'create_temp_pack_visit': """
+ CREATE TEMPORARY TABLE temp_pack_visit (
+ zoid BIGINT NOT NULL,
+ keep_tid BIGINT
+ );
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
+ """,
+
+ 'pre_pack_follow_child_refs': """
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN temp_pack_visit USING (zoid)
+ WHERE object_ref.tid >= temp_pack_visit.keep_tid
+ )
+ """,
+
+ 'choose_pack_transaction': """
+ SELECT tid
+ FROM transaction
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND packed = FALSE
+ ORDER BY tid DESC
+ LIMIT 1
+ """,
+
+ 'create_temp_undo': """
+ CREATE TEMPORARY TABLE temp_undo (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL
+ );
+ CREATE UNIQUE INDEX temp_undo_zoid ON temp_undo (zoid)
+ """,
+
+ 'reset_temp_undo': "DROP TABLE temp_undo",
+
+ 'transaction_has_data': """
+ SELECT tid
+ FROM object_state
+ WHERE tid = %(tid)s
+ LIMIT 1
+ """,
+
+ 'pack_current_object': """
+ DELETE FROM current_object
+ WHERE tid = %(tid)s
+ AND zoid in (
+ SELECT pack_state.zoid
+ FROM pack_state
+ WHERE pack_state.tid = %(tid)s
+ )
+ """,
+
+ 'pack_object_state': """
+ DELETE FROM object_state
+ WHERE tid = %(tid)s
+ AND zoid in (
+ SELECT pack_state.zoid
+ FROM pack_state
+ WHERE pack_state.tid = %(tid)s
+ )
+ """,
+
+ 'pack_object_ref': """
+ DELETE FROM object_refs_added
+ WHERE tid IN (
+ SELECT tid
+ FROM transaction
+ WHERE empty = %(TRUE)s
+ );
+ DELETE FROM object_ref
+ WHERE tid IN (
+ SELECT tid
+ FROM transaction
+ WHERE empty = %(TRUE)s
+ )
+ """,
+ }
+
+ def _transaction_iterator(self, cursor):
+ """Iterate over a list of transactions returned from the database.
+
+ Each row begins with (tid, username, description, extension)
+ and may have other columns.
+ """
+ for row in cursor:
+ tid, username, description, ext = row[:4]
+ if username is None:
+ username = ''
+ else:
+ username = str(username)
+ if description is None:
+ description = ''
+ else:
+ description = str(description)
+ if ext is None:
+ ext = ''
+ else:
+ ext = str(ext)
+ yield (tid, username, description, ext) + tuple(row[4:])
+
+
+ def iter_transactions(self, cursor):
+ """Iterate over the transaction log, newest first.
+
+ Skips packed transactions.
+ Yields (tid, username, description, extension) for each transaction.
+ """
+ stmt = """
+ SELECT tid, username, description, extension
+ FROM transaction
+ WHERE packed = %(FALSE)s
+ AND tid != 0
+ ORDER BY tid DESC
+ """
+ self._run_script_stmt(cursor, stmt)
+ return self._transaction_iterator(cursor)
+
+
+ def iter_transactions_range(self, cursor, start=None, stop=None):
+ """Iterate over the transactions in the given range, oldest first.
+
+ Includes packed transactions.
+ Yields (tid, username, description, extension, packed)
+ for each transaction.
+ """
+ stmt = """
+ SELECT tid, username, description, extension,
+ CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END
+ FROM transaction
+ WHERE tid >= 0
+ """
+ if start is not None:
+ stmt += " AND tid >= %(min_tid)s"
+ if stop is not None:
+ stmt += " AND tid <= %(max_tid)s"
+ stmt += " ORDER BY tid"
+ self._run_script_stmt(cursor, stmt,
+ {'min_tid': start, 'max_tid': stop})
+ return self._transaction_iterator(cursor)
+
+
+ def iter_object_history(self, cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+ stmt = """
+ SELECT 1 FROM current_object WHERE zoid = %(oid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'oid': oid})
+ if not cursor.fetchall():
+ raise KeyError(oid)
+
+ stmt = """
+ SELECT tid, username, description, extension, %(OCTET_LENGTH)s(state)
+ FROM transaction
+ JOIN object_state USING (tid)
+ WHERE zoid = %(oid)s
+ AND packed = %(FALSE)s
+ ORDER BY tid DESC
+ """
+ self._run_script_stmt(cursor, stmt, {'oid': oid})
+ return self._transaction_iterator(cursor)
+
+
+ def verify_undoable(self, cursor, undo_tid):
+ """Raise UndoError if it is not safe to undo the specified txn."""
+ stmt = """
+ SELECT 1 FROM transaction
+ WHERE tid = %(undo_tid)s
+ AND packed = %(FALSE)s
+ """
+ self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+ if not cursor.fetchall():
+ raise UndoError("Transaction not found or packed")
+
+ # Rule: we can undo an object if the object's state in the
+ # transaction to undo matches the object's current state.
+ # If any object in the transaction does not fit that rule,
+ # refuse to undo.
+ stmt = """
+ SELECT prev_os.zoid, current_object.tid
+ FROM object_state prev_os
+ JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
+ JOIN current_object ON (cur_os.zoid = current_object.zoid
+ AND cur_os.tid = current_object.tid)
+ WHERE prev_os.tid = %(undo_tid)s
+ AND cur_os.md5 != prev_os.md5
+ """
+ self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+ if cursor.fetchmany():
+ raise UndoError(
+ "Some data were modified by a later transaction")
+
+ # Rule: don't allow the creation of the root object to
+ # be undone. It's hard to get it back.
+ stmt = """
+ SELECT 1
+ FROM object_state
+ WHERE tid = %(undo_tid)s
+ AND zoid = 0
+ AND prev_tid = 0
+ """
+ self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+ if cursor.fetchall():
+ raise UndoError("Can't undo the creation of the root object")
+
+
+ def undo(self, cursor, undo_tid, self_tid):
+ """Undo a transaction.
+
+ Parameters: "undo_tid", the integer tid of the transaction to undo,
+ and "self_tid", the integer tid of the current transaction.
+
+ Returns the states copied forward by the undo operation as a
+ list of (oid, old_tid).
+ """
+ stmt = self._scripts['create_temp_undo']
+ if stmt:
+ self._run_script(cursor, stmt)
+
+ stmt = """
+ DELETE FROM temp_undo;
+
+ -- Put into temp_undo the list of objects to be undone and
+ -- the tid of the transaction that has the undone state.
+ INSERT INTO temp_undo (zoid, prev_tid)
+ SELECT zoid, prev_tid
+ FROM object_state
+ WHERE tid = %(undo_tid)s;
+
+ -- Override previous undo operations within this transaction
+ -- by resetting the current_object pointer and deleting
+ -- copied states from object_state.
+ UPDATE current_object
+ SET tid = (
+ SELECT prev_tid
+ FROM object_state
+ WHERE zoid = current_object.zoid
+ AND tid = %(self_tid)s
+ )
+ WHERE zoid IN (SELECT zoid FROM temp_undo)
+ AND tid = %(self_tid)s;
+
+ DELETE FROM object_state
+ WHERE zoid IN (SELECT zoid FROM temp_undo)
+ AND tid = %(self_tid)s;
+
+ -- Copy old states forward.
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
+ prev.md5, prev.state
+ FROM temp_undo
+ JOIN current_object ON (temp_undo.zoid = current_object.zoid)
+ LEFT JOIN object_state prev
+ ON (prev.zoid = temp_undo.zoid
+ AND prev.tid = temp_undo.prev_tid);
+
+ -- List the copied states.
+ SELECT zoid, prev_tid FROM temp_undo
+ """
+ self._run_script(cursor, stmt,
+ {'undo_tid': undo_tid, 'self_tid': self_tid})
+ res = list(cursor)
+
+ stmt = self._scripts['reset_temp_undo']
+ if stmt:
+ self._run_script(cursor, stmt)
+
+ return res
+
+
+ def choose_pack_transaction(self, pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+ conn, cursor = self.open()
+ try:
+ stmt = self._scripts['choose_pack_transaction']
+ self._run_script(cursor, stmt, {'tid': pack_point})
+ rows = cursor.fetchall()
+ if not rows:
+ # Nothing needs to be packed.
+ return None
+ return rows[0][0]
+ finally:
+ self.close(conn, cursor)
+
+
+ def pre_pack(self, pack_tid, get_references, options):
+ """Decide what to pack.
+
+ tid specifies the most recent transaction to pack.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+
+ options is an instance of relstorage.Options.
+ The options.pack_gc flag indicates whether to run garbage collection.
+ If pack_gc is false, at least one revision of every object is kept,
+ even if nothing refers to it. Packing with pack_gc disabled can be
+ much faster.
+ """
+ conn, cursor = self.open_for_pre_pack()
+ try:
+ try:
+ if options.pack_gc:
+ log.info("pre_pack: start with gc enabled")
+ self._pre_pack_with_gc(
+ conn, cursor, pack_tid, get_references)
+ else:
+ log.info("pre_pack: start without gc")
+ self._pre_pack_without_gc(
+ conn, cursor, pack_tid)
+ conn.commit()
+
+ log.info("pre_pack: enumerating states to pack")
+ stmt = "%(TRUNCATE)s pack_state"
+ self._run_script_stmt(cursor, stmt)
+ to_remove = 0
+
+ if options.pack_gc:
+ # Pack objects with the keep flag set to false.
+ stmt = """
+ INSERT INTO pack_state (tid, zoid)
+ SELECT tid, zoid
+ FROM object_state
+ JOIN pack_object USING (zoid)
+ WHERE keep = %(FALSE)s
+ AND tid > 0
+ AND tid <= %(pack_tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
+ to_remove += cursor.rowcount
+
+ # Pack object states with the keep flag set to true.
+ stmt = """
+ INSERT INTO pack_state (tid, zoid)
+ SELECT tid, zoid
+ FROM object_state
+ JOIN pack_object USING (zoid)
+ WHERE keep = %(TRUE)s
+ AND tid > 0
+ AND tid != keep_tid
+ AND tid <= %(pack_tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'pack_tid':pack_tid})
+ to_remove += cursor.rowcount
+
+ log.info("pre_pack: enumerating transactions to pack")
+ stmt = "%(TRUNCATE)s pack_state_tid"
+ self._run_script_stmt(cursor, stmt)
+ stmt = """
+ INSERT INTO pack_state_tid (tid)
+ SELECT DISTINCT tid
+ FROM pack_state
+ """
+ cursor.execute(stmt)
+
+ log.info("pre_pack: will remove %d object state(s)",
+ to_remove)
+
+ except:
+ log.exception("pre_pack: failed")
+ conn.rollback()
+ raise
+ else:
+ log.info("pre_pack: finished successfully")
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def _pre_pack_without_gc(self, conn, cursor, pack_tid):
+ """Determine what to pack, without garbage collection.
+
+ With garbage collection disabled, there is no need to follow
+ object references.
+ """
+ # Fill the pack_object table with OIDs, but configure them
+ # all to be kept by setting keep to true.
+ log.debug("pre_pack: populating pack_object")
+ stmt = """
+ %(TRUNCATE)s pack_object;
+
+ INSERT INTO pack_object (zoid, keep, keep_tid)
+ SELECT zoid, %(TRUE)s, MAX(tid)
+ FROM object_state
+ WHERE tid > 0 AND tid <= %(pack_tid)s
+ GROUP BY zoid
+ """
+ self._run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+
+ def _pre_pack_with_gc(self, conn, cursor, pack_tid, get_references):
+ """Determine what to pack, with garbage collection.
+ """
+ stmt = self._scripts['create_temp_pack_visit']
+ if stmt:
+ self._run_script(cursor, stmt)
+
+ self.fill_object_refs(conn, cursor, get_references)
+
+ log.info("pre_pack: filling the pack_object table")
+ # Fill the pack_object table with OIDs that either will be
+ # removed (if nothing references the OID) or whose history will
+ # be cut.
+ stmt = """
+ %(TRUNCATE)s pack_object;
+
+ INSERT INTO pack_object (zoid, keep, keep_tid)
+ SELECT zoid, %(FALSE)s, MAX(tid)
+ FROM object_state
+ WHERE tid > 0 AND tid <= %(pack_tid)s
+ GROUP BY zoid;
+
+ -- If the root object is in pack_object, keep it.
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE zoid = 0;
+
+ -- Keep objects that have been revised since pack_tid.
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(pack_tid)s
+ );
+
+ -- Keep objects that are still referenced by object states in
+ -- transactions that will not be packed.
+ -- Use temp_pack_visit for temporary state; otherwise MySQL 5 chokes.
+ INSERT INTO temp_pack_visit (zoid)
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ WHERE tid > %(pack_tid)s;
+
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM temp_pack_visit
+ );
+
+ %(TRUNCATE)s temp_pack_visit;
+ """
+ self._run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+ # Set the 'keep' flags in pack_object
+ self._visit_all_references(cursor)
+
+
+ def pack(self, pack_tid, options, sleep=time.sleep, packed_func=None):
+ """Pack. Requires the information provided by pre_pack."""
+
+ # Read committed mode is sufficient.
+ conn, cursor = self.open()
+ try:
+ try:
+ stmt = """
+ SELECT transaction.tid,
+ CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END,
+ CASE WHEN pack_state_tid.tid IS NOT NULL THEN 1 ELSE 0 END
+ FROM transaction
+ LEFT JOIN pack_state_tid ON (
+ transaction.tid = pack_state_tid.tid)
+ WHERE transaction.tid > 0
+ AND transaction.tid <= %(pack_tid)s
+ AND (packed = %(FALSE)s OR pack_state_tid.tid IS NOT NULL)
+ """
+ self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
+ tid_rows = list(cursor)
+ tid_rows.sort() # oldest first
+
+ log.info("pack: will pack %d transaction(s)", len(tid_rows))
+
+ stmt = self._scripts['create_temp_pack_visit']
+ if stmt:
+ self._run_script(cursor, stmt)
+
+ # Hold the commit lock while packing to prevent deadlocks.
+ # Pack in small batches of transactions in order to minimize
+ # the interruption of concurrent write operations.
+ start = time.time()
+ packed_list = []
+ self._hold_commit_lock(cursor)
+ for tid, packed, has_removable in tid_rows:
+ self._pack_transaction(
+ cursor, pack_tid, tid, packed, has_removable,
+ packed_list)
+ if time.time() >= start + options.pack_batch_timeout:
+ conn.commit()
+ if packed_func is not None:
+ for oid, tid in packed_list:
+ packed_func(oid, tid)
+ del packed_list[:]
+ self._release_commit_lock(cursor)
+ self._pause_pack(sleep, options, start)
+ self._hold_commit_lock(cursor)
+ start = time.time()
+ if packed_func is not None:
+ for oid, tid in packed_list:
+ packed_func(oid, tid)
+ packed_list = None
+
+ self._pack_cleanup(conn, cursor)
+
+ except:
+ log.exception("pack: failed")
+ conn.rollback()
+ raise
+
+ else:
+ log.info("pack: finished successfully")
+ conn.commit()
+
+ finally:
+ self.close(conn, cursor)
+
+
+ def _pack_transaction(self, cursor, pack_tid, tid, packed,
+ has_removable, packed_list):
+ """Pack one transaction. Requires populated pack tables."""
+ log.debug("pack: transaction %d: packing", tid)
+ removed_objects = 0
+ removed_states = 0
+
+ if has_removable:
+ stmt = self._scripts['pack_current_object']
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ removed_objects = cursor.rowcount
+
+ stmt = self._scripts['pack_object_state']
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ removed_states = cursor.rowcount
+
+ # Terminate prev_tid chains
+ stmt = """
+ UPDATE object_state SET prev_tid = 0
+ WHERE prev_tid = %(tid)s
+ AND tid <= %(pack_tid)s
+ """
+ self._run_script_stmt(cursor, stmt,
+ {'pack_tid': pack_tid, 'tid': tid})
+
+ stmt = """
+ SELECT pack_state.zoid
+ FROM pack_state
+ WHERE pack_state.tid = %(tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ for (oid,) in cursor:
+ packed_list.append((oid, tid))
+
+ # Find out whether the transaction is empty
+ stmt = self._scripts['transaction_has_data']
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ empty = not list(cursor)
+
+ # mark the transaction packed and possibly empty
+ if empty:
+ clause = 'empty = %(TRUE)s'
+ state = 'empty'
+ else:
+ clause = 'empty = %(FALSE)s'
+ state = 'not empty'
+ stmt = "UPDATE transaction SET packed = %(TRUE)s, " + clause
+ stmt += " WHERE tid = %(tid)s"
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+ log.debug(
+ "pack: transaction %d (%s): removed %d object(s) and %d state(s)",
+ tid, state, removed_objects, removed_states)
+
+
+ def _pack_cleanup(self, conn, cursor):
+ """Remove unneeded table rows after packing"""
+ # commit the work done so far
+ conn.commit()
+ self._release_commit_lock(cursor)
+ self._hold_commit_lock(cursor)
+ log.info("pack: cleaning up")
+
+ log.debug("pack: removing unused object references")
+ stmt = self._scripts['pack_object_ref']
+ self._run_script(cursor, stmt)
+
+ log.debug("pack: removing empty packed transactions")
+ stmt = """
+ DELETE FROM transaction
+ WHERE packed = %(TRUE)s
+ AND empty = %(TRUE)s
+ """
+ self._run_script_stmt(cursor, stmt)
+
+ # perform cleanup that does not require the commit lock
+ conn.commit()
+ self._release_commit_lock(cursor)
+
+ log.debug("pack: clearing temporary pack state")
+ for _table in ('pack_object', 'pack_state', 'pack_state_tid'):
+ stmt = '%(TRUNCATE)s ' + _table
+ self._run_script_stmt(cursor, stmt)
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2009-09-23 07:33:30 UTC (rev 104443)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2009-09-23 07:38:44 UTC (rev 104444)
@@ -53,7 +53,7 @@
import time
from ZODB.POSException import StorageError
-from common import Adapter
+from relstorage.adapters.historypreserving import HistoryPreservingAdapter
log = logging.getLogger("relstorage.adapters.mysql")
@@ -68,10 +68,10 @@
close_exceptions = disconnected_exceptions + (MySQLdb.ProgrammingError,)
-class MySQLAdapter(Adapter):
+class MySQLAdapter(HistoryPreservingAdapter):
"""MySQL adapter for RelStorage."""
- _scripts = Adapter._scripts.copy()
+ _scripts = HistoryPreservingAdapter._scripts.copy()
# Work around a MySQL performance bug by avoiding an expensive subquery.
# See: http://mail.zope.org/pipermail/zodb-dev/2008-May/011880.html
# http://bugs.mysql.com/bug.php?id=28257
@@ -90,7 +90,7 @@
# Note: UPDATE must be the last statement in the script
# because it returns a value.
- 'prepack_follow_child_refs': """
+ 'pre_pack_follow_child_refs': """
%(TRUNCATE)s temp_pack_child;
INSERT INTO temp_pack_child
@@ -650,7 +650,7 @@
"""Open a connection to be used for the pre-pack phase.
Returns (conn, cursor).
- This overrides the method by the same name in common.Adapter.
+ This overrides a method.
"""
conn, cursor = self.open(transaction_mode=None)
try:
@@ -666,7 +666,7 @@
def _hold_commit_lock(self, cursor):
"""Hold the commit lock.
- This overrides the method by the same name in common.Adapter.
+ This overrides a method.
"""
cursor.execute("SELECT GET_LOCK(CONCAT(DATABASE(), '.commit'), %s)",
(commit_lock_timeout,))
@@ -678,7 +678,7 @@
def _release_commit_lock(self, cursor):
"""Release the commit lock.
- This overrides the method by the same name in common.Adapter.
+ This overrides a method.
"""
cursor.execute("SELECT RELEASE_LOCK(CONCAT(DATABASE(), '.commit'))")
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2009-09-23 07:33:30 UTC (rev 104443)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2009-09-23 07:38:44 UTC (rev 104444)
@@ -20,7 +20,7 @@
import cx_Oracle
from ZODB.POSException import StorageError
-from common import Adapter
+from relstorage.adapters.historypreserving import HistoryPreservingAdapter
log = logging.getLogger("relstorage.adapters.oracle")
@@ -51,7 +51,7 @@
return value
-class OracleAdapter(Adapter):
+class OracleAdapter(HistoryPreservingAdapter):
"""Oracle adapter for RelStorage."""
_script_vars = {
@@ -68,7 +68,7 @@
'max_tid': ':max_tid',
}
- _scripts = Adapter._scripts.copy()
+ _scripts = HistoryPreservingAdapter._scripts.copy()
_scripts.update({
'choose_pack_transaction': """
SELECT MAX(tid)
@@ -117,7 +117,10 @@
def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
"""Execute a statement from a script with the given parameters.
- This overrides the method by the same name in common.Adapter.
+ params should be either an empty tuple (no parameters) or
+ a map.
+
+ This overrides a method.
"""
if generic_params:
# Oracle raises ORA-01036 if the parameter map contains extra keys,
@@ -140,7 +143,20 @@
stmt, params)
raise
+ def _run_many(self, cursor, stmt, items):
+ """Execute a statement repeatedly. Items should be a list of tuples.
+ stmt should use '%s' parameter format. Overrides a method.
+ """
+ # replace '%s' with ':n'
+ matches = []
+ def replace(match):
+ matches.append(None)
+ return ':%d' % len(matches)
+ stmt = re.sub('%s', replace, stmt)
+
+ cursor.executemany(stmt, items)
+
def create_schema(self, cursor):
"""Create the database tables."""
stmt = """
@@ -766,21 +782,6 @@
# No action needed
pass
-
- def _add_object_ref_rows(self, cursor, add_rows):
- """Add rows to object_ref.
-
- The input rows are tuples containing (from_zoid, tid, to_zoid).
-
- This overrides the method by the same name in common.Adapter.
- """
- stmt = """
- INSERT INTO object_ref (zoid, tid, to_zoid)
- VALUES (:1, :2, :3)
- """
- cursor.executemany(stmt, add_rows)
-
-
_poll_query = "SELECT MAX(tid) FROM transaction"
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2009-09-23 07:33:30 UTC (rev 104443)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2009-09-23 07:38:44 UTC (rev 104444)
@@ -19,7 +19,7 @@
import re
from ZODB.POSException import StorageError
-from common import Adapter
+from relstorage.adapters.historypreserving import HistoryPreservingAdapter
log = logging.getLogger("relstorage.adapters.postgresql")
@@ -28,7 +28,7 @@
disconnected_exceptions = (psycopg2.OperationalError, psycopg2.InterfaceError)
-class PostgreSQLAdapter(Adapter):
+class PostgreSQLAdapter(HistoryPreservingAdapter):
"""PostgreSQL adapter for RelStorage."""
def __init__(self, dsn=''):
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2009-09-23 07:33:30 UTC (rev 104443)
+++ relstorage/trunk/relstorage/relstorage.py 2009-09-23 07:38:44 UTC (rev 104444)
@@ -782,10 +782,10 @@
return ''
def supportsUndo(self):
- return True
+ return self._adapter.keep_history
def supportsTransactionalUndo(self):
- return True
+ return self._adapter.keep_history
def undoLog(self, first=0, last=-20, filter=None):
if last < 0:
More information about the checkins
mailing list