[Checkins] SVN: relstorage/trunk/relstorage/ Checkpoint: refactoring adapters from inheritance to composition.
Shane Hathaway
shane at hathawaymix.org
Wed Sep 23 17:12:58 EDT 2009
Log message for revision 104464:
Checkpoint: refactoring adapters from inheritance to composition.
Changed:
D relstorage/trunk/relstorage/adapters/abstract.py
A relstorage/trunk/relstorage/adapters/connmanager.py
A relstorage/trunk/relstorage/adapters/dbiter.py
D relstorage/trunk/relstorage/adapters/historyfree.py
D relstorage/trunk/relstorage/adapters/historypreserving.py
A relstorage/trunk/relstorage/adapters/interfaces.py
A relstorage/trunk/relstorage/adapters/loadstore.py
A relstorage/trunk/relstorage/adapters/locker.py
U relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
A relstorage/trunk/relstorage/adapters/packundo.py
A relstorage/trunk/relstorage/adapters/poller.py
U relstorage/trunk/relstorage/adapters/postgresql.py
A relstorage/trunk/relstorage/adapters/schema.py
A relstorage/trunk/relstorage/adapters/scriptrunner.py
A relstorage/trunk/relstorage/adapters/stats.py
A relstorage/trunk/relstorage/adapters/txncontrol.py
U relstorage/trunk/relstorage/relstorage.py
-=-
Deleted: relstorage/trunk/relstorage/adapters/abstract.py
===================================================================
--- relstorage/trunk/relstorage/adapters/abstract.py 2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/abstract.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -1,414 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Code common to most adapters."""
-
-import logging
-import time
-
-try:
- from hashlib import md5
-except ImportError:
- from md5 import new as md5
-
-
-log = logging.getLogger(__name__)
-
-# Notes about adapters:
-#
-# An adapter must not hold a connection, cursor, or database state, because
-# RelStorage opens multiple concurrent connections using a single adapter
-# instance.
-# Within the context of an adapter, all OID and TID values are integers,
-# not binary strings, except as noted.
-
-class AbstractAdapter(object):
- """Common code for a database adapter.
- """
-
- keep_history = None # True or False
- verify_sane_database = False
-
- # _script_vars contains replacements for statements in scripts.
- # These are correct for PostgreSQL and MySQL but not for Oracle.
- _script_vars = {
- 'TRUE': 'TRUE',
- 'FALSE': 'FALSE',
- 'OCTET_LENGTH': 'OCTET_LENGTH',
- 'TRUNCATE': 'TRUNCATE',
- 'oid': '%(oid)s',
- 'tid': '%(tid)s',
- 'pack_tid': '%(pack_tid)s',
- 'undo_tid': '%(undo_tid)s',
- 'self_tid': '%(self_tid)s',
- 'min_tid': '%(min_tid)s',
- 'max_tid': '%(max_tid)s',
- }
-
- def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
- """Execute a statement from a script with the given parameters.
-
- params should be either an empty tuple (no parameters) or
- a map.
-
- Subclasses may override this.
- The input statement is generic and needs to be transformed
- into a database-specific statement.
- """
- stmt = generic_stmt % self._script_vars
- try:
- cursor.execute(stmt, generic_params)
- except:
- log.warning("script statement failed: %r; parameters: %r",
- stmt, generic_params)
- raise
-
- def _run_script(self, cursor, script, params=()):
- """Execute a series of statements in the database.
-
- params should be either an empty tuple (no parameters) or
- a map.
-
- The statements are transformed by _run_script_stmt
- before execution.
- """
- lines = []
- for line in script.split('\n'):
- line = line.strip()
- if not line or line.startswith('--'):
- continue
- if line.endswith(';'):
- line = line[:-1]
- lines.append(line)
- stmt = '\n'.join(lines)
- self._run_script_stmt(cursor, stmt, params)
- lines = []
- else:
- lines.append(line)
- if lines:
- stmt = '\n'.join(lines)
- self._run_script_stmt(cursor, stmt, params)
-
- def _run_many(self, cursor, stmt, items):
- """Execute a statement repeatedly. Items should be a list of tuples.
-
- stmt should use '%s' parameter format. Overridden by adapters
- that use a different parameter format.
- """
- cursor.executemany(stmt, items)
-
- def _open_and_call(self, callback):
- """Call a function with an open connection and cursor.
-
- If the function returns, commits the transaction and returns the
- result returned by the function.
- If the function raises an exception, aborts the transaction
- then propagates the exception.
- """
- conn, cursor = self.open()
- try:
- try:
- res = callback(conn, cursor)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- return res
- finally:
- self.close(conn, cursor)
-
- def md5sum(self, data):
- if data is not None:
- return md5(data).hexdigest()
- else:
- # George Bailey object
- return None
-
- def iter_objects(self, cursor, tid):
- """Iterate over object states in a transaction.
-
- Yields (oid, prev_tid, state) for each object state.
- """
- stmt = """
- SELECT zoid, state
- FROM object_state
- WHERE tid = %(tid)s
- ORDER BY zoid
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- for oid, state in cursor:
- if hasattr(state, 'read'):
- # Oracle
- state = state.read()
- yield oid, state
-
- def open_for_pre_pack(self):
- """Open a connection to be used for the pre-pack phase.
- Returns (conn, cursor).
-
- Subclasses may override this.
- """
- return self.open()
-
- def _hold_commit_lock(self, cursor):
- """Hold the commit lock for packing"""
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
-
- def _release_commit_lock(self, cursor):
- """Release the commit lock during packing"""
- # no action needed
- pass
-
-
- def fill_object_refs(self, conn, cursor, get_references):
- """Update the object_refs table by analyzing new transactions."""
- if self.keep_history:
- stmt = """
- SELECT transaction.tid
- FROM transaction
- LEFT JOIN object_refs_added
- ON (transaction.tid = object_refs_added.tid)
- WHERE object_refs_added.tid IS NULL
- ORDER BY transaction.tid
- """
- else:
- stmt = """
- SELECT transaction.tid
- FROM (SELECT DISTINCT tid FROM object_state) AS transaction
- LEFT JOIN object_refs_added
- ON (transaction.tid = object_refs_added.tid)
- WHERE object_refs_added.tid IS NULL
- ORDER BY transaction.tid
- """
-
- self._run_script_stmt(cursor, stmt)
- tids = [tid for (tid,) in cursor]
- if tids:
- added = 0
- log.info("discovering references from objects in %d "
- "transaction(s)" % len(tids))
- for tid in tids:
- added += self._add_refs_for_tid(cursor, tid, get_references)
- if added >= 10000:
- # save the work done so far
- conn.commit()
- added = 0
- if added:
- conn.commit()
-
- def _add_refs_for_tid(self, cursor, tid, get_references):
- """Fill object_refs with all states for a transaction.
-
- Returns the number of references added.
- """
- log.debug("pre_pack: transaction %d: computing references ", tid)
- from_count = 0
-
- stmt = """
- SELECT zoid, state
- FROM object_state
- WHERE tid = %(tid)s
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
-
- add_rows = [] # [(from_oid, tid, to_oid)]
- for from_oid, state in cursor:
- if hasattr(state, 'read'):
- # Oracle
- state = state.read()
- if state:
- from_count += 1
- try:
- to_oids = get_references(str(state))
- except:
- log.error("pre_pack: can't unpickle "
- "object %d in transaction %d; state length = %d" % (
- from_oid, tid, len(state)))
- raise
- if self.keep_history:
- for to_oid in to_oids:
- add_rows.append((from_oid, tid, to_oid))
- else:
- for to_oid in to_oids:
- add_rows.append((from_oid, to_oid))
-
- if self.keep_history:
- stmt = """
- INSERT INTO object_ref (zoid, tid, to_zoid)
- VALUES (%s, %s, %s)
- """
- self._run_many(cursor, stmt, add_rows)
-
- else:
- stmt = """
- DELETE FROM object_ref
- WHERE zoid in (
- SELECT zoid
- FROM object_state
- WHERE tid = %(tid)s
- )
- """
- self._run_script(cursor, stmt, {'tid': tid})
-
- stmt = """
- INSERT INTO object_ref (zoid, to_zoid)
- VALUES (%s, %s)
- """
- self._run_many(cursor, stmt, add_rows)
-
- # The references have been computed for this transaction.
- stmt = """
- INSERT INTO object_refs_added (tid)
- VALUES (%(tid)s)
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
-
- to_count = len(add_rows)
- log.debug("pre_pack: transaction %d: has %d reference(s) "
- "from %d object(s)", tid, to_count, from_count)
- return to_count
-
-
- def _visit_all_references(self, cursor):
- """Visit all references in pack_object and set the keep flags.
- """
- # Each of the objects to be kept might refer to other objects.
- # Mark the referenced objects to be kept as well. Do this
- # repeatedly until all references have been satisfied.
- pass_num = 1
- while True:
- log.info("pre_pack: following references, pass %d", pass_num)
-
- # Make a list of all parent objects that still need to be
- # visited. Then set pack_object.visited for all pack_object
- # rows with keep = true.
- stmt = """
- %(TRUNCATE)s temp_pack_visit;
-
- INSERT INTO temp_pack_visit (zoid, keep_tid)
- SELECT zoid, keep_tid
- FROM pack_object
- WHERE keep = %(TRUE)s
- AND visited = %(FALSE)s;
-
- UPDATE pack_object SET visited = %(TRUE)s
- WHERE keep = %(TRUE)s
- AND visited = %(FALSE)s
- """
- self._run_script(cursor, stmt)
- visit_count = cursor.rowcount
-
- if self.verify_sane_database:
- # Verify the update actually worked.
- # MySQL 5.1.23 fails this test; 5.1.24 passes.
- stmt = """
- SELECT 1
- FROM pack_object
- WHERE keep = %(TRUE)s AND visited = %(FALSE)s
- """
- self._run_script_stmt(cursor, stmt)
- if list(cursor):
- raise AssertionError(
- "database failed to update pack_object")
-
- log.debug("pre_pack: checking references from %d object(s)",
- visit_count)
-
- # Visit the children of all parent objects that were
- # just visited.
- stmt = self._scripts['pre_pack_follow_child_refs']
- self._run_script(cursor, stmt)
- found_count = cursor.rowcount
-
- log.debug("pre_pack: found %d more referenced object(s) in "
- "pass %d", found_count, pass_num)
- if not found_count:
- # No new references detected.
- break
- else:
- pass_num += 1
-
-
- def _pause_pack(self, sleep, options, start):
- """Pause packing to allow concurrent commits."""
- elapsed = time.time() - start
- if elapsed == 0.0:
- # Compensate for low timer resolution by
- # assuming that at least 10 ms elapsed.
- elapsed = 0.01
- duty_cycle = options.pack_duty_cycle
- if duty_cycle > 0.0 and duty_cycle < 1.0:
- delay = min(options.pack_max_delay,
- elapsed * (1.0 / duty_cycle - 1.0))
- if delay > 0:
- log.debug('pack: sleeping %.4g second(s)', delay)
- sleep(delay)
-
-
- def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
- """Polls for new transactions.
-
- conn and cursor must have been created previously by open_for_load().
- prev_polled_tid is the tid returned at the last poll, or None
- if this is the first poll. If ignore_tid is not None, changes
- committed in that transaction will not be included in the list
- of changed OIDs.
-
- Returns (changed_oids, new_polled_tid).
- """
- # find out the tid of the most recent transaction.
- cursor.execute(self._poll_query)
- new_polled_tid = cursor.fetchone()[0]
-
- if prev_polled_tid is None:
- # This is the first time the connection has polled.
- return None, new_polled_tid
-
- if new_polled_tid == prev_polled_tid:
- # No transactions have been committed since prev_polled_tid.
- return (), new_polled_tid
-
- if self.keep_history:
- stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
- else:
- stmt = "SELECT 1 FROM object_state WHERE tid <= %(tid)s LIMIT 1"
- cursor.execute(intern(stmt % self._script_vars),
- {'tid': prev_polled_tid})
- rows = cursor.fetchall()
- if not rows:
- # Transaction not found; perhaps it has been packed.
- # The connection cache needs to be cleared.
- return None, new_polled_tid
-
- # Get the list of changed OIDs and return it.
- if ignore_tid is None:
- stmt = """
- SELECT zoid
- FROM current_object
- WHERE tid > %(tid)s
- """
- cursor.execute(intern(stmt % self._script_vars),
- {'tid': prev_polled_tid})
- else:
- stmt = """
- SELECT zoid
- FROM current_object
- WHERE tid > %(tid)s
- AND tid != %(self_tid)s
- """
- cursor.execute(intern(stmt % self._script_vars),
- {'tid': prev_polled_tid, 'self_tid': ignore_tid})
- oids = [oid for (oid,) in cursor]
-
- return oids, new_polled_tid
Added: relstorage/trunk/relstorage/adapters/connmanager.py
===================================================================
--- relstorage/trunk/relstorage/adapters/connmanager.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/connmanager.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,63 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from relstorage.adapters.interfaces import IConnectionManager
+from zope.interface import implements
+
+class AbstractConnectionManager(object):
+ """Abstract base class for connection management.
+
+ Responsible for opening and closing database connections.
+ """
+ implements(IConnectionManager)
+
+ # close_exceptions contains the exception types to ignore
+ # when the adapter attempts to close a database connection.
+ close_exceptions = ()
+
+ def open(self):
+ """Open a database connection and return (conn, cursor)."""
+ raise NotImplementedError
+
+ def close(self, conn, cursor):
+ """Close a connection and cursor, ignoring certain errors.
+ """
+ for obj in (cursor, conn):
+ if obj is not None:
+ try:
+ obj.close()
+ except self.close_exceptions:
+ pass
+
+ def open_and_call(self, callback):
+ """Call a function with an open connection and cursor.
+
+ If the function returns, commits the transaction and returns the
+ result returned by the function.
+ If the function raises an exception, aborts the transaction
+ then propagates the exception.
+ """
+ conn, cursor = self.open()
+ try:
+ try:
+ res = callback(conn, cursor)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ return res
+ finally:
+ self.close(conn, cursor)
+
Added: relstorage/trunk/relstorage/adapters/dbiter.py
===================================================================
--- relstorage/trunk/relstorage/adapters/dbiter.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/dbiter.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,187 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from relstorage.adapters.interfaces import IDatabaseIterator
+from zope.interface import implements
+
+class DatabaseIterator(object):
+ """Abstract base class for database iteration.
+ """
+
+ def __init__(self, runner):
+ self.runner = runner
+
+ def iter_objects(self, cursor, tid):
+ """Iterate over object states in a transaction.
+
+ Yields (oid, prev_tid, state) for each object state.
+ """
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = %(tid)s
+ ORDER BY zoid
+ """
+ self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+ for oid, state in cursor:
+ if hasattr(state, 'read'):
+ # Oracle
+ state = state.read()
+ yield oid, state
+
+
+class HistoryPreservingDatabaseIterator(DatabaseIterator):
+ implements(IDatabaseIterator)
+
+ def _transaction_iterator(self, cursor):
+ """Iterate over a list of transactions returned from the database.
+
+ Each row begins with (tid, username, description, extension)
+ and may have other columns.
+ """
+ for row in cursor:
+ tid, username, description, ext = row[:4]
+ if username is None:
+ username = ''
+ else:
+ username = str(username)
+ if description is None:
+ description = ''
+ else:
+ description = str(description)
+ if ext is None:
+ ext = ''
+ else:
+ ext = str(ext)
+ yield (tid, username, description, ext) + tuple(row[4:])
+
+
+ def iter_transactions(self, cursor):
+ """Iterate over the transaction log, newest first.
+
+ Skips packed transactions.
+ Yields (tid, username, description, extension) for each transaction.
+ """
+ stmt = """
+ SELECT tid, username, description, extension
+ FROM transaction
+ WHERE packed = %(FALSE)s
+ AND tid != 0
+ ORDER BY tid DESC
+ """
+ self.runner.run_script_stmt(cursor, stmt)
+ return self._transaction_iterator(cursor)
+
+
+ def iter_transactions_range(self, cursor, start=None, stop=None):
+ """Iterate over the transactions in the given range, oldest first.
+
+ Includes packed transactions.
+ Yields (tid, username, description, extension, packed)
+ for each transaction.
+ """
+ stmt = """
+ SELECT tid, username, description, extension,
+ CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END
+ FROM transaction
+ WHERE tid >= 0
+ """
+ if start is not None:
+ stmt += " AND tid >= %(min_tid)s"
+ if stop is not None:
+ stmt += " AND tid <= %(max_tid)s"
+ stmt += " ORDER BY tid"
+ self.runner.run_script_stmt(cursor, stmt,
+ {'min_tid': start, 'max_tid': stop})
+ return self._transaction_iterator(cursor)
+
+
+ def iter_object_history(self, cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+ stmt = """
+ SELECT 1 FROM current_object WHERE zoid = %(oid)s
+ """
+ self.runner.run_script_stmt(cursor, stmt, {'oid': oid})
+ if not cursor.fetchall():
+ raise KeyError(oid)
+
+ stmt = """
+ SELECT tid, username, description, extension, %(OCTET_LENGTH)s(state)
+ FROM transaction
+ JOIN object_state USING (tid)
+ WHERE zoid = %(oid)s
+ AND packed = %(FALSE)s
+ ORDER BY tid DESC
+ """
+ self.runner.run_script_stmt(cursor, stmt, {'oid': oid})
+ return self._transaction_iterator(cursor)
+
+
+class HistoryFreeDatabaseIterator(DatabaseIterator):
+ implements(IDatabaseIterator)
+
+ def iter_transactions(self, cursor):
+ """Iterate over the transaction log, newest first.
+
+ Skips packed transactions.
+ Yields (tid, username, description, extension) for each transaction.
+ """
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ ORDER BY tid DESC
+ """
+ self.runner.run_script_stmt(cursor, stmt)
+ return ((tid, '', '', '') for (tid,) in cursor)
+
+ def iter_transactions_range(self, cursor, start=None, stop=None):
+ """Iterate over the transactions in the given range, oldest first.
+
+ Includes packed transactions.
+ Yields (tid, username, description, extension, packed)
+ for each transaction.
+ """
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ WHERE tid > 0
+ """
+ if start is not None:
+ stmt += " AND tid >= %(min_tid)s"
+ if stop is not None:
+ stmt += " AND tid <= %(max_tid)s"
+ stmt += " ORDER BY tid"
+ self.runner.run_script_stmt(cursor, stmt,
+ {'min_tid': start, 'max_tid': stop})
+ return ((tid, '', '', '', True) for (tid,) in cursor)
+
+ def iter_object_history(self, cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+ stmt = """
+ SELECT tid, %(OCTET_LENGTH)s(state)
+ FROM object_state
+ WHERE zoid = %(oid)s
+ """
+ self.runner.run_script_stmt(cursor, stmt, {'oid': oid})
+ return ((tid, '', '', '', size) for (tid, size) in cursor)
Deleted: relstorage/trunk/relstorage/adapters/historyfree.py
===================================================================
--- relstorage/trunk/relstorage/adapters/historyfree.py 2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/historyfree.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -1,310 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Code common to history-free adapters."""
-
-
-import logging
-import time
-from relstorage.adapters.abstract import AbstractAdapter
-from ZODB.POSException import UndoError
-
-log = logging.getLogger(__name__)
-
-
-class HistoryFreeAdapter(AbstractAdapter):
- """An abstract adapter that does not retain history.
-
- Derivatives should have at least the following schema::
-
- -- All object states in all transactions.
- CREATE TABLE object_state (
- zoid BIGINT NOT NULL PRIMARY KEY,
- tid BIGINT NOT NULL CHECK (tid > 0),
- state BYTEA
- );
-
- -- A list of referenced OIDs from each object_state.
- -- This table is populated as needed during garbage collection.
- CREATE TABLE object_ref (
- zoid BIGINT NOT NULL,
- to_zoid BIGINT NOT NULL,
- PRIMARY KEY (zoid, to_zoid)
- );
-
- -- The object_refs_added table tracks whether object_refs has
- -- been populated for all states in a given transaction.
- -- An entry is added only when the work is finished.
- CREATE TABLE object_refs_added (
- tid BIGINT NOT NULL PRIMARY KEY
- );
-
- -- Temporary state during garbage collection:
- -- The list of all objects, a flag signifying whether
- -- the object should be kept, and a flag signifying whether
- -- the object's references have been visited.
- -- The keep_tid field specifies the current revision of the object.
- CREATE TABLE pack_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- keep BOOLEAN NOT NULL DEFAULT FALSE,
- keep_tid BIGINT NOT NULL,
- visited BOOLEAN NOT NULL DEFAULT FALSE
- );
- """
-
- keep_history = False
-
- _scripts = {
- 'create_temp_pack_visit': """
- CREATE TEMPORARY TABLE temp_pack_visit (
- zoid BIGINT NOT NULL
- );
- CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
- """,
-
- 'pre_pack_follow_child_refs': """
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE keep = %(FALSE)s
- AND zoid IN (
- SELECT DISTINCT to_zoid
- FROM object_ref
- JOIN temp_pack_visit USING (zoid)
- )
- """,
- }
-
- def iter_transactions(self, cursor):
- """Iterate over the transaction log, newest first.
-
- Skips packed transactions.
- Yields (tid, username, description, extension) for each transaction.
- """
- stmt = """
- SELECT DISTINCT tid
- FROM object_state
- ORDER BY tid DESC
- """
- self._run_script_stmt(cursor, stmt)
- return ((tid, '', '', '') for (tid,) in cursor)
-
- def iter_transactions_range(self, cursor, start=None, stop=None):
- """Iterate over the transactions in the given range, oldest first.
-
- Includes packed transactions.
- Yields (tid, username, description, extension, packed)
- for each transaction.
- """
- stmt = """
- SELECT DISTINCT tid
- FROM object_state
- WHERE tid > 0
- """
- if start is not None:
- stmt += " AND tid >= %(min_tid)s"
- if stop is not None:
- stmt += " AND tid <= %(max_tid)s"
- stmt += " ORDER BY tid"
- self._run_script_stmt(cursor, stmt,
- {'min_tid': start, 'max_tid': stop})
- return ((tid, '', '', '', True) for (tid,) in cursor)
-
- def iter_object_history(self, cursor, oid):
- """Iterate over an object's history.
-
- Raises KeyError if the object does not exist.
- Yields (tid, username, description, extension, pickle_size)
- for each modification.
- """
- stmt = """
- SELECT tid, %(OCTET_LENGTH)s(state)
- FROM object_state
- WHERE zoid = %(oid)s
- """
- self._run_script_stmt(cursor, stmt, {'oid': oid})
- return ((tid, '', '', '', size) for (tid, size) in cursor)
-
- def verify_undoable(self, cursor, undo_tid):
- """Raise UndoError if it is not safe to undo the specified txn."""
- raise UndoError("Undo is not supported by this storage")
-
- def undo(self, cursor, undo_tid, self_tid):
- """Undo a transaction.
-
- Parameters: "undo_tid", the integer tid of the transaction to undo,
- and "self_tid", the integer tid of the current transaction.
-
- Returns the list of OIDs undone.
- """
- raise UndoError("Undo is not supported by this storage")
-
- def choose_pack_transaction(self, pack_point):
- """Return the transaction before or at the specified pack time.
-
- Returns None if there is nothing to pack.
- """
- return 1
-
-
- def pre_pack(self, pack_tid, get_references, options):
- """Decide what the garbage collector should delete.
-
- pack_tid is ignored.
-
- get_references is a function that accepts a pickled state and
- returns a set of OIDs that state refers to.
-
- options is an instance of relstorage.Options.
- The options.pack_gc flag indicates whether to run garbage collection.
- If pack_gc is false, this method does nothing.
- """
- if not options.pack_gc:
- log.warning("pre_pack: garbage collection is disabled on a "
- "history-free storage, so doing nothing")
- return
-
- conn, cursor = self.open_for_pre_pack()
- try:
- try:
- self._pre_pack_main(conn, cursor, get_references)
- except:
- log.exception("pre_pack: failed")
- conn.rollback()
- raise
- else:
- conn.commit()
- log.info("pre_pack: finished successfully")
- finally:
- self.close(conn, cursor)
-
-
- def _pre_pack_main(self, conn, cursor, get_references):
- """Determine what to garbage collect.
- """
- stmt = self._scripts['create_temp_pack_visit']
- if stmt:
- self._run_script(cursor, stmt)
-
- self.fill_object_refs(conn, cursor, get_references)
-
- log.info("pre_pack: filling the pack_object table")
- # Fill the pack_object table with all known OIDs.
- stmt = """
- %(TRUNCATE)s pack_object;
-
- INSERT INTO pack_object (zoid, keep_tid)
- SELECT zoid, tid
- FROM object_state;
-
- -- Keep the root object
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE zoid = 0;
- """
- self._run_script(cursor, stmt)
-
- # Set the 'keep' flags in pack_object
- self._visit_all_references(cursor)
-
-
- def pack(self, pack_tid, options, sleep=time.sleep, packed_func=None):
- """Run garbage collection.
-
- Requires the information provided by _pre_gc.
- """
-
- # Read committed mode is sufficient.
- conn, cursor = self.open()
- try:
- try:
- stmt = """
- SELECT zoid, keep_tid
- FROM pack_object
- WHERE keep = %(FALSE)s
- """
- self._run_script_stmt(cursor, stmt)
- to_remove = list(cursor)
-
- log.info("pack: will remove %d object(s)", len(to_remove))
-
- # Hold the commit lock while packing to prevent deadlocks.
- # Pack in small batches of transactions in order to minimize
- # the interruption of concurrent write operations.
- start = time.time()
- packed_list = []
- self._hold_commit_lock(cursor)
-
- for item in to_remove:
- oid, tid = item
- stmt = """
- DELETE FROM object_state
- WHERE zoid = %(oid)s
- AND tid = %(tid)s
- """
- self._run_script_stmt(
- cursor, stmt, {'oid': oid, 'tid': tid})
- packed_list.append(item)
-
- if time.time() >= start + options.pack_batch_timeout:
- conn.commit()
- if packed_func is not None:
- for oid, tid in packed_list:
- packed_func(oid, tid)
- del packed_list[:]
- self._release_commit_lock(cursor)
- self._pause_pack(sleep, options, start)
- self._hold_commit_lock(cursor)
- start = time.time()
-
- if packed_func is not None:
- for oid, tid in packed_list:
- packed_func(oid, tid)
- packed_list = None
-
- self._pack_cleanup(conn, cursor)
-
- except:
- log.exception("pack: failed")
- conn.rollback()
- raise
-
- else:
- log.info("pack: finished successfully")
- conn.commit()
-
- finally:
- self.close(conn, cursor)
-
-
- def _pack_cleaup(self, conn, cursor):
- # commit the work done so far
- conn.commit()
- self._release_commit_lock(cursor)
- self._hold_commit_lock(cursor)
- log.info("pack: cleaning up")
-
- stmt = """
- DELETE FROM object_refs_added
- WHERE tid NOT IN (
- SELECT DISTINCT tid
- FROM object_state
- );
-
- DELETE FROM object_ref
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = %(FALSE)s
- );
-
- %(TRUNCATE)s pack_object
- """
- self._run_script(cursor, stmt)
Deleted: relstorage/trunk/relstorage/adapters/historypreserving.py
===================================================================
--- relstorage/trunk/relstorage/adapters/historypreserving.py 2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/historypreserving.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -1,724 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Code common to history-preserving adapters."""
-
-
-import logging
-import time
-from relstorage.adapters.abstract import AbstractAdapter
-from ZODB.POSException import UndoError
-
-log = logging.getLogger(__name__)
-
-
-class HistoryPreservingAdapter(AbstractAdapter):
- """An abstract adapter that retains history.
-
- Derivatives should have at least the following schema::
-
- -- The list of all transactions in the database
- CREATE TABLE transaction (
- tid BIGINT NOT NULL PRIMARY KEY,
- packed BOOLEAN NOT NULL DEFAULT FALSE,
- empty BOOLEAN NOT NULL DEFAULT FALSE,
- username BYTEA NOT NULL,
- description BYTEA NOT NULL,
- extension BYTEA
- );
-
- -- All object states in all transactions. Note that md5 and state
- -- can be null to represent object uncreation.
- CREATE TABLE object_state (
- zoid BIGINT NOT NULL,
- tid BIGINT NOT NULL REFERENCES transaction
- CHECK (tid > 0),
- PRIMARY KEY (zoid, tid),
- prev_tid BIGINT NOT NULL REFERENCES transaction,
- md5 CHAR(32),
- state BYTEA
- );
-
- -- Pointers to the current object state
- CREATE TABLE current_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- tid BIGINT NOT NULL,
- FOREIGN KEY (zoid, tid) REFERENCES object_state
- );
-
- -- A list of referenced OIDs from each object_state.
- -- This table is populated as needed during packing.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because rows in object_state
- -- are never modified once committed, and rows are removed
- -- from object_state only by packing.
- CREATE TABLE object_ref (
- zoid BIGINT NOT NULL,
- tid BIGINT NOT NULL,
- to_zoid BIGINT NOT NULL,
- PRIMARY KEY (tid, zoid, to_zoid)
- );
-
- -- The object_refs_added table tracks whether object_refs has
- -- been populated for all states in a given transaction.
- -- An entry is added only when the work is finished.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because object states
- -- are never added to a transaction once committed, and
- -- rows are removed from the transaction table only by
- -- packing.
- CREATE TABLE object_refs_added (
- tid BIGINT NOT NULL PRIMARY KEY
- );
-
- -- Temporary state during packing:
- -- The list of objects to pack. If keep is false,
- -- the object and all its revisions will be removed.
- -- If keep is true, instead of removing the object,
- -- the pack operation will cut the object's history.
- -- The keep_tid field specifies the oldest revision
- -- of the object to keep.
- -- The visited flag is set when pre_pack is visiting an object's
- -- references, and remains set.
- CREATE TABLE pack_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- keep BOOLEAN NOT NULL,
- keep_tid BIGINT NOT NULL,
- visited BOOLEAN NOT NULL DEFAULT FALSE
- );
-
- -- Temporary state during packing: the list of object states to pack.
- CREATE TABLE pack_state (
- tid BIGINT NOT NULL,
- zoid BIGINT NOT NULL,
- PRIMARY KEY (tid, zoid)
- );
-
- -- Temporary state during packing: the list of transactions that
- -- have at least one object state to pack.
- CREATE TABLE pack_state_tid (
- tid BIGINT NOT NULL PRIMARY KEY
- );
- """
-
- keep_history = True
-
- _scripts = {
- 'create_temp_pack_visit': """
- CREATE TEMPORARY TABLE temp_pack_visit (
- zoid BIGINT NOT NULL,
- keep_tid BIGINT
- );
- CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
- """,
-
- 'pre_pack_follow_child_refs': """
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE keep = %(FALSE)s
- AND zoid IN (
- SELECT DISTINCT to_zoid
- FROM object_ref
- JOIN temp_pack_visit USING (zoid)
- WHERE object_ref.tid >= temp_pack_visit.keep_tid
- )
- """,
-
- 'choose_pack_transaction': """
- SELECT tid
- FROM transaction
- WHERE tid > 0
- AND tid <= %(tid)s
- AND packed = FALSE
- ORDER BY tid DESC
- LIMIT 1
- """,
-
- 'create_temp_undo': """
- CREATE TEMPORARY TABLE temp_undo (
- zoid BIGINT NOT NULL,
- prev_tid BIGINT NOT NULL
- );
- CREATE UNIQUE INDEX temp_undo_zoid ON temp_undo (zoid)
- """,
-
- 'reset_temp_undo': "DROP TABLE temp_undo",
-
- 'transaction_has_data': """
- SELECT tid
- FROM object_state
- WHERE tid = %(tid)s
- LIMIT 1
- """,
-
- 'pack_current_object': """
- DELETE FROM current_object
- WHERE tid = %(tid)s
- AND zoid in (
- SELECT pack_state.zoid
- FROM pack_state
- WHERE pack_state.tid = %(tid)s
- )
- """,
-
- 'pack_object_state': """
- DELETE FROM object_state
- WHERE tid = %(tid)s
- AND zoid in (
- SELECT pack_state.zoid
- FROM pack_state
- WHERE pack_state.tid = %(tid)s
- )
- """,
-
- 'pack_object_ref': """
- DELETE FROM object_refs_added
- WHERE tid IN (
- SELECT tid
- FROM transaction
- WHERE empty = %(TRUE)s
- );
- DELETE FROM object_ref
- WHERE tid IN (
- SELECT tid
- FROM transaction
- WHERE empty = %(TRUE)s
- )
- """,
- }
-
- def _transaction_iterator(self, cursor):
- """Iterate over a list of transactions returned from the database.
-
- Each row begins with (tid, username, description, extension)
- and may have other columns.
- """
- for row in cursor:
- tid, username, description, ext = row[:4]
- if username is None:
- username = ''
- else:
- username = str(username)
- if description is None:
- description = ''
- else:
- description = str(description)
- if ext is None:
- ext = ''
- else:
- ext = str(ext)
- yield (tid, username, description, ext) + tuple(row[4:])
-
-
- def iter_transactions(self, cursor):
- """Iterate over the transaction log, newest first.
-
- Skips packed transactions.
- Yields (tid, username, description, extension) for each transaction.
- """
- stmt = """
- SELECT tid, username, description, extension
- FROM transaction
- WHERE packed = %(FALSE)s
- AND tid != 0
- ORDER BY tid DESC
- """
- self._run_script_stmt(cursor, stmt)
- return self._transaction_iterator(cursor)
-
-
- def iter_transactions_range(self, cursor, start=None, stop=None):
- """Iterate over the transactions in the given range, oldest first.
-
- Includes packed transactions.
- Yields (tid, username, description, extension, packed)
- for each transaction.
- """
- stmt = """
- SELECT tid, username, description, extension,
- CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END
- FROM transaction
- WHERE tid >= 0
- """
- if start is not None:
- stmt += " AND tid >= %(min_tid)s"
- if stop is not None:
- stmt += " AND tid <= %(max_tid)s"
- stmt += " ORDER BY tid"
- self._run_script_stmt(cursor, stmt,
- {'min_tid': start, 'max_tid': stop})
- return self._transaction_iterator(cursor)
-
-
- def iter_object_history(self, cursor, oid):
- """Iterate over an object's history.
-
- Raises KeyError if the object does not exist.
- Yields (tid, username, description, extension, pickle_size)
- for each modification.
- """
- stmt = """
- SELECT 1 FROM current_object WHERE zoid = %(oid)s
- """
- self._run_script_stmt(cursor, stmt, {'oid': oid})
- if not cursor.fetchall():
- raise KeyError(oid)
-
- stmt = """
- SELECT tid, username, description, extension, %(OCTET_LENGTH)s(state)
- FROM transaction
- JOIN object_state USING (tid)
- WHERE zoid = %(oid)s
- AND packed = %(FALSE)s
- ORDER BY tid DESC
- """
- self._run_script_stmt(cursor, stmt, {'oid': oid})
- return self._transaction_iterator(cursor)
-
-
- def verify_undoable(self, cursor, undo_tid):
- """Raise UndoError if it is not safe to undo the specified txn."""
- stmt = """
- SELECT 1 FROM transaction
- WHERE tid = %(undo_tid)s
- AND packed = %(FALSE)s
- """
- self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
- if not cursor.fetchall():
- raise UndoError("Transaction not found or packed")
-
- # Rule: we can undo an object if the object's state in the
- # transaction to undo matches the object's current state.
- # If any object in the transaction does not fit that rule,
- # refuse to undo.
- stmt = """
- SELECT prev_os.zoid, current_object.tid
- FROM object_state prev_os
- JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
- JOIN current_object ON (cur_os.zoid = current_object.zoid
- AND cur_os.tid = current_object.tid)
- WHERE prev_os.tid = %(undo_tid)s
- AND cur_os.md5 != prev_os.md5
- """
- self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
- if cursor.fetchmany():
- raise UndoError(
- "Some data were modified by a later transaction")
-
- # Rule: don't allow the creation of the root object to
- # be undone. It's hard to get it back.
- stmt = """
- SELECT 1
- FROM object_state
- WHERE tid = %(undo_tid)s
- AND zoid = 0
- AND prev_tid = 0
- """
- self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
- if cursor.fetchall():
- raise UndoError("Can't undo the creation of the root object")
-
-
- def undo(self, cursor, undo_tid, self_tid):
- """Undo a transaction.
-
- Parameters: "undo_tid", the integer tid of the transaction to undo,
- and "self_tid", the integer tid of the current transaction.
-
- Returns the states copied forward by the undo operation as a
- list of (oid, old_tid).
- """
- stmt = self._scripts['create_temp_undo']
- if stmt:
- self._run_script(cursor, stmt)
-
- stmt = """
- DELETE FROM temp_undo;
-
- -- Put into temp_undo the list of objects to be undone and
- -- the tid of the transaction that has the undone state.
- INSERT INTO temp_undo (zoid, prev_tid)
- SELECT zoid, prev_tid
- FROM object_state
- WHERE tid = %(undo_tid)s;
-
- -- Override previous undo operations within this transaction
- -- by resetting the current_object pointer and deleting
- -- copied states from object_state.
- UPDATE current_object
- SET tid = (
- SELECT prev_tid
- FROM object_state
- WHERE zoid = current_object.zoid
- AND tid = %(self_tid)s
- )
- WHERE zoid IN (SELECT zoid FROM temp_undo)
- AND tid = %(self_tid)s;
-
- DELETE FROM object_state
- WHERE zoid IN (SELECT zoid FROM temp_undo)
- AND tid = %(self_tid)s;
-
- -- Copy old states forward.
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
- prev.md5, prev.state
- FROM temp_undo
- JOIN current_object ON (temp_undo.zoid = current_object.zoid)
- LEFT JOIN object_state prev
- ON (prev.zoid = temp_undo.zoid
- AND prev.tid = temp_undo.prev_tid);
-
- -- List the copied states.
- SELECT zoid, prev_tid FROM temp_undo
- """
- self._run_script(cursor, stmt,
- {'undo_tid': undo_tid, 'self_tid': self_tid})
- res = list(cursor)
-
- stmt = self._scripts['reset_temp_undo']
- if stmt:
- self._run_script(cursor, stmt)
-
- return res
-
-
- def choose_pack_transaction(self, pack_point):
- """Return the transaction before or at the specified pack time.
-
- Returns None if there is nothing to pack.
- """
- conn, cursor = self.open()
- try:
- stmt = self._scripts['choose_pack_transaction']
- self._run_script(cursor, stmt, {'tid': pack_point})
- rows = cursor.fetchall()
- if not rows:
- # Nothing needs to be packed.
- return None
- return rows[0][0]
- finally:
- self.close(conn, cursor)
-
-
- def pre_pack(self, pack_tid, get_references, options):
- """Decide what to pack.
-
- tid specifies the most recent transaction to pack.
-
- get_references is a function that accepts a pickled state and
- returns a set of OIDs that state refers to.
-
- options is an instance of relstorage.Options.
- The options.pack_gc flag indicates whether to run garbage collection.
- If pack_gc is false, at least one revision of every object is kept,
- even if nothing refers to it. Packing with pack_gc disabled can be
- much faster.
- """
- conn, cursor = self.open_for_pre_pack()
- try:
- try:
- if options.pack_gc:
- log.info("pre_pack: start with gc enabled")
- self._pre_pack_with_gc(
- conn, cursor, pack_tid, get_references)
- else:
- log.info("pre_pack: start without gc")
- self._pre_pack_without_gc(
- conn, cursor, pack_tid)
- conn.commit()
-
- log.info("pre_pack: enumerating states to pack")
- stmt = "%(TRUNCATE)s pack_state"
- self._run_script_stmt(cursor, stmt)
- to_remove = 0
-
- if options.pack_gc:
- # Pack objects with the keep flag set to false.
- stmt = """
- INSERT INTO pack_state (tid, zoid)
- SELECT tid, zoid
- FROM object_state
- JOIN pack_object USING (zoid)
- WHERE keep = %(FALSE)s
- AND tid > 0
- AND tid <= %(pack_tid)s
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
- to_remove += cursor.rowcount
-
- # Pack object states with the keep flag set to true.
- stmt = """
- INSERT INTO pack_state (tid, zoid)
- SELECT tid, zoid
- FROM object_state
- JOIN pack_object USING (zoid)
- WHERE keep = %(TRUE)s
- AND tid > 0
- AND tid != keep_tid
- AND tid <= %(pack_tid)s
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid':pack_tid})
- to_remove += cursor.rowcount
-
- log.info("pre_pack: enumerating transactions to pack")
- stmt = "%(TRUNCATE)s pack_state_tid"
- self._run_script_stmt(cursor, stmt)
- stmt = """
- INSERT INTO pack_state_tid (tid)
- SELECT DISTINCT tid
- FROM pack_state
- """
- cursor.execute(stmt)
-
- log.info("pre_pack: will remove %d object state(s)",
- to_remove)
-
- except:
- log.exception("pre_pack: failed")
- conn.rollback()
- raise
- else:
- log.info("pre_pack: finished successfully")
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def _pre_pack_without_gc(self, conn, cursor, pack_tid):
- """Determine what to pack, without garbage collection.
-
- With garbage collection disabled, there is no need to follow
- object references.
- """
- # Fill the pack_object table with OIDs, but configure them
- # all to be kept by setting keep to true.
- log.debug("pre_pack: populating pack_object")
- stmt = """
- %(TRUNCATE)s pack_object;
-
- INSERT INTO pack_object (zoid, keep, keep_tid)
- SELECT zoid, %(TRUE)s, MAX(tid)
- FROM object_state
- WHERE tid > 0 AND tid <= %(pack_tid)s
- GROUP BY zoid
- """
- self._run_script(cursor, stmt, {'pack_tid': pack_tid})
-
-
- def _pre_pack_with_gc(self, conn, cursor, pack_tid, get_references):
- """Determine what to pack, with garbage collection.
- """
- stmt = self._scripts['create_temp_pack_visit']
- if stmt:
- self._run_script(cursor, stmt)
-
- self.fill_object_refs(conn, cursor, get_references)
-
- log.info("pre_pack: filling the pack_object table")
- # Fill the pack_object table with OIDs that either will be
- # removed (if nothing references the OID) or whose history will
- # be cut.
- stmt = """
- %(TRUNCATE)s pack_object;
-
- INSERT INTO pack_object (zoid, keep, keep_tid)
- SELECT zoid, %(FALSE)s, MAX(tid)
- FROM object_state
- WHERE tid > 0 AND tid <= %(pack_tid)s
- GROUP BY zoid;
-
- -- If the root object is in pack_object, keep it.
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE zoid = 0;
-
- -- Keep objects that have been revised since pack_tid.
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE zoid IN (
- SELECT zoid
- FROM current_object
- WHERE tid > %(pack_tid)s
- );
-
- -- Keep objects that are still referenced by object states in
- -- transactions that will not be packed.
- -- Use temp_pack_visit for temporary state; otherwise MySQL 5 chokes.
- INSERT INTO temp_pack_visit (zoid)
- SELECT DISTINCT to_zoid
- FROM object_ref
- WHERE tid > %(pack_tid)s;
-
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE zoid IN (
- SELECT zoid
- FROM temp_pack_visit
- );
-
- %(TRUNCATE)s temp_pack_visit;
- """
- self._run_script(cursor, stmt, {'pack_tid': pack_tid})
-
- # Set the 'keep' flags in pack_object
- self._visit_all_references(cursor)
-
-
- def pack(self, pack_tid, options, sleep=time.sleep, packed_func=None):
- """Pack. Requires the information provided by pre_pack."""
-
- # Read committed mode is sufficient.
- conn, cursor = self.open()
- try:
- try:
- stmt = """
- SELECT transaction.tid,
- CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END,
- CASE WHEN pack_state_tid.tid IS NOT NULL THEN 1 ELSE 0 END
- FROM transaction
- LEFT JOIN pack_state_tid ON (
- transaction.tid = pack_state_tid.tid)
- WHERE transaction.tid > 0
- AND transaction.tid <= %(pack_tid)s
- AND (packed = %(FALSE)s OR pack_state_tid.tid IS NOT NULL)
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
- tid_rows = list(cursor)
- tid_rows.sort() # oldest first
-
- log.info("pack: will pack %d transaction(s)", len(tid_rows))
-
- stmt = self._scripts['create_temp_pack_visit']
- if stmt:
- self._run_script(cursor, stmt)
-
- # Hold the commit lock while packing to prevent deadlocks.
- # Pack in small batches of transactions in order to minimize
- # the interruption of concurrent write operations.
- start = time.time()
- packed_list = []
- self._hold_commit_lock(cursor)
- for tid, packed, has_removable in tid_rows:
- self._pack_transaction(
- cursor, pack_tid, tid, packed, has_removable,
- packed_list)
- if time.time() >= start + options.pack_batch_timeout:
- conn.commit()
- if packed_func is not None:
- for oid, tid in packed_list:
- packed_func(oid, tid)
- del packed_list[:]
- self._release_commit_lock(cursor)
- self._pause_pack(sleep, options, start)
- self._hold_commit_lock(cursor)
- start = time.time()
- if packed_func is not None:
- for oid, tid in packed_list:
- packed_func(oid, tid)
- packed_list = None
-
- self._pack_cleanup(conn, cursor)
-
- except:
- log.exception("pack: failed")
- conn.rollback()
- raise
-
- else:
- log.info("pack: finished successfully")
- conn.commit()
-
- finally:
- self.close(conn, cursor)
-
-
- def _pack_transaction(self, cursor, pack_tid, tid, packed,
- has_removable, packed_list):
- """Pack one transaction. Requires populated pack tables."""
- log.debug("pack: transaction %d: packing", tid)
- removed_objects = 0
- removed_states = 0
-
- if has_removable:
- stmt = self._scripts['pack_current_object']
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- removed_objects = cursor.rowcount
-
- stmt = self._scripts['pack_object_state']
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- removed_states = cursor.rowcount
-
- # Terminate prev_tid chains
- stmt = """
- UPDATE object_state SET prev_tid = 0
- WHERE prev_tid = %(tid)s
- AND tid <= %(pack_tid)s
- """
- self._run_script_stmt(cursor, stmt,
- {'pack_tid': pack_tid, 'tid': tid})
-
- stmt = """
- SELECT pack_state.zoid
- FROM pack_state
- WHERE pack_state.tid = %(tid)s
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- for (oid,) in cursor:
- packed_list.append((oid, tid))
-
- # Find out whether the transaction is empty
- stmt = self._scripts['transaction_has_data']
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- empty = not list(cursor)
-
- # mark the transaction packed and possibly empty
- if empty:
- clause = 'empty = %(TRUE)s'
- state = 'empty'
- else:
- clause = 'empty = %(FALSE)s'
- state = 'not empty'
- stmt = "UPDATE transaction SET packed = %(TRUE)s, " + clause
- stmt += " WHERE tid = %(tid)s"
- self._run_script_stmt(cursor, stmt, {'tid': tid})
-
- log.debug(
- "pack: transaction %d (%s): removed %d object(s) and %d state(s)",
- tid, state, removed_objects, removed_states)
-
-
- def _pack_cleanup(self, conn, cursor):
- """Remove unneeded table rows after packing"""
- # commit the work done so far
- conn.commit()
- self._release_commit_lock(cursor)
- self._hold_commit_lock(cursor)
- log.info("pack: cleaning up")
-
- log.debug("pack: removing unused object references")
- stmt = self._scripts['pack_object_ref']
- self._run_script(cursor, stmt)
-
- log.debug("pack: removing empty packed transactions")
- stmt = """
- DELETE FROM transaction
- WHERE packed = %(TRUE)s
- AND empty = %(TRUE)s
- """
- self._run_script_stmt(cursor, stmt)
-
- # perform cleanup that does not require the commit lock
- conn.commit()
- self._release_commit_lock(cursor)
-
- log.debug("pack: clearing temporary pack state")
- for _table in ('pack_object', 'pack_state', 'pack_state_tid'):
- stmt = '%(TRUNCATE)s ' + _table
- self._run_script_stmt(cursor, stmt)
Added: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/interfaces.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,251 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Interfaces provided by RelStorage database adapters"""
+
+from zope.interface import Attribute
+from zope.interface import Interface
+
+class IConnectionManager(Interface):
+
+ def open():
+ """Open a database connection and return (conn, cursor)."""
+
+ def close(conn, cursor):
+ """Close a connection and cursor, ignoring certain errors.
+ """
+
+ def open_and_call(callback):
+ """Call a function with an open connection and cursor.
+
+ If the function returns, commits the transaction and returns the
+ result returned by the function.
+ If the function raises an exception, aborts the transaction
+ then propagates the exception.
+ """
+
+
+class IDatabaseIterator(Interface):
+
+ def iter_objects(cursor, tid):
+ """Iterate over object states in a transaction.
+
+ Yields (oid, prev_tid, state) for each object state.
+ """
+
+ def iter_transactions(cursor):
+ """Iterate over the transaction log, newest first.
+
+ Skips packed transactions.
+ Yields (tid, username, description, extension) for each transaction.
+ """
+
+ def iter_transactions_range(cursor, start=None, stop=None):
+ """Iterate over the transactions in the given range, oldest first.
+
+ Includes packed transactions.
+ Yields (tid, username, description, extension, packed)
+ for each transaction.
+ """
+
+ def iter_object_history(cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+
+
+class ILocker(Interface):
+
+ def hold_commit_lock(cursor, ensure_current=False):
+ """Acquire the commit lock.
+
+ If ensure_current is True, other tables may be locked as well, to
+ ensure the most current data is available.
+
+ May raise StorageError if the lock can not be acquired before
+ some timeout.
+ """
+
+ def release_commit_lock(cursor):
+ """Release the commit lock"""
+
+ def hold_pack_lock(cursor):
+ """Try to acquire the pack lock.
+
+ Raise StorageError if packing or undo is already in progress.
+ """
+
+ def release_pack_lock(cursor):
+ """Release the pack lock."""
+
+
+class IPackUndo(Interface):
+
+ def verify_undoable(cursor, undo_tid):
+ """Raise UndoError if it is not safe to undo the specified txn.
+ """
+
+ def undo(cursor, undo_tid, self_tid):
+ """Undo a transaction.
+
+ Parameters: "undo_tid", the integer tid of the transaction to undo,
+ and "self_tid", the integer tid of the current transaction.
+
+ Returns the states copied forward by the undo operation as a
+ list of (oid, old_tid).
+
+ May raise UndoError.
+ """
+
+ def open_for_pre_pack():
+ """Open a connection to be used for the pre-pack phase.
+
+ Returns (conn, cursor).
+ """
+
+ def fill_object_refs(conn, cursor, get_references):
+ """Update the object_refs table by analyzing new transactions.
+ """
+
+ def choose_pack_transaction(pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+
+ def pre_pack(pack_tid, get_references, options):
+ """Decide what to pack.
+
+ pack_tid specifies the most recent transaction to pack.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+
+ options is an instance of relstorage.Options.
+ In particular, the options.pack_gc flag indicates whether
+ to run garbage collection.
+ """
+
+ def pack(pack_tid, options, sleep=None, packed_func=None):
+ """Pack. Requires the information provided by pre_pack.
+
+ packed_func, if provided, will be called for every object
+ packed, just after it is removed. The function must accept
+ two parameters, oid and tid (64 bit integers).
+
+ The sleep function defaults to time.sleep(). It can be
+ overridden to do something else instead of sleep during pauses
+ configured by the duty cycle.
+ """
+
+
+class IPoller(Interface):
+
+ def poll_invalidations(conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid).
+ """
+
+
+class ISchemaInstaller(Interface):
+
+ def create(cursor):
+ """Create the database tables, sequences, etc."""
+
+ def prepare():
+ """Create the database schema if it does not already exist."""
+
+ def zap_all():
+ """Clear all data out of the database."""
+
+ def drop_all():
+ """Drop all tables and sequences."""
+
+
+class IScriptRunner(Interface):
+
+ script_vars = Attribute(
+ """A mapping providing replacements for parts of scripts.
+
+ Used for making scripts compatible with databases using
+ different parameter styles.
+ """)
+
+ def run_script_stmt(cursor, generic_stmt, generic_params=()):
+ """Execute a statement from a script with the given parameters.
+
+ generic_params should be either an empty tuple (no parameters) or
+ a map.
+
+ The input statement is generic and will be transformed
+ into a database-specific statement.
+ """
+
+ def run_script(cursor, script, params=()):
+ """Execute a series of statements in the database.
+
+ params should be either an empty tuple (no parameters) or
+ a map.
+
+ The statements are transformed by run_script_stmt
+ before execution.
+ """
+
+ def run_many(cursor, stmt, items):
+ """Execute a statement repeatedly. Items should be a list of tuples.
+
+ stmt should use '%s' parameter format (not %(name)s).
+ """
+
+
+class ITransactionControl(Interface):
+
+ def commit_phase1(conn, cursor, tid):
+ """Begin a commit. Returns the transaction name.
+
+ The transaction name must not be None.
+
+ This method should guarantee that commit_phase2() will succeed,
+ meaning that if commit_phase2() would raise any error, the error
+ should be raised in commit_phase1() instead.
+ """
+
+ def commit_phase2(conn, cursor, txn):
+ """Final transaction commit.
+
+ txn is the name returned by commit_phase1.
+ """
+
+ def abort(conn, cursor, txn=None):
+ """Abort the commit. If txn is not None, phase 1 is also aborted."""
+
+ def get_tid_and_time(cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+
+ def add_transaction(cursor, tid, username, description, extension,
+ packed=False):
+ """Add a transaction."""
+
Added: relstorage/trunk/relstorage/adapters/loadstore.py
===================================================================
--- relstorage/trunk/relstorage/adapters/loadstore.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/loadstore.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,867 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Object load/store implementations.
+"""
+
+from base64 import decodestring
+from base64 import encodestring
+from ZODB.POSException import StorageError
+import time
+
+try:
+ from hashlib import md5
+except ImportError:
+ from md5 import new as md5
+
+
+def compute_md5sum(data):
+ if data is not None:
+ return md5(data).hexdigest()
+ else:
+ # George Bailey object
+ return None
+
+
+class HistoryPreservingPostgreSQLLoadStore(object):
+
+ def __init__(self, connmanager, disconnected_exceptions):
+ self.connmanager = connmanager
+ self.disconnected_exceptions = disconnected_exceptions
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.connmanager.open(
+ self.connmanager.isolation_serializable)
+ stmt = """
+ PREPARE get_latest_tid AS
+ SELECT tid
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ cursor.execute(stmt)
+ return conn, cursor
+
+ def restart_load(self, cursor):
+ """Reinitialize a connection for loading objects."""
+ try:
+ cursor.connection.rollback()
+ except self.disconnected_exceptions, e:
+ raise StorageError(e)
+
+ def get_current_tid(self, cursor, oid):
+ """Returns the current integer tid for an object.
+
+ oid is an integer. Returns None if object does not exist.
+ """
+ cursor.execute("""
+ SELECT tid
+ FROM current_object
+ WHERE zoid = %s
+ """, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ return None
+
+ def load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64'), tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = %s
+ """, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ state64, tid = cursor.fetchone()
+ if state64 is not None:
+ state = decodestring(state64)
+ else:
+ # This object's creation has been undone
+ state = None
+ return state, tid
+ else:
+ return None, None
+
+ def load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64')
+ FROM object_state
+ WHERE zoid = %s
+ AND tid = %s
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ (state64,) = cursor.fetchone()
+ if state64 is not None:
+ return decodestring(state64)
+ return None
+
+ def exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+ return cursor.rowcount
+
+ def load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64'), tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid < %s
+ ORDER BY tid DESC
+ LIMIT 1
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ state64, tid = cursor.fetchone()
+ if state64 is not None:
+ state = decodestring(state64)
+ else:
+ # The object's creation has been undone
+ state = None
+ return state, tid
+ else:
+ return None, None
+
+ def get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid > %s
+ ORDER BY tid
+ LIMIT 1
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ else:
+ return None
+
+ def _make_temp_table(self, cursor):
+ """Create the temporary table for storing objects"""
+ stmt = """
+ CREATE TEMPORARY TABLE temp_store (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state BYTEA
+ ) ON COMMIT DROP;
+ CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+ """
+ cursor.execute(stmt)
+
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.connmanager.open()
+ try:
+ self._make_temp_table(cursor)
+ return conn, cursor
+ except:
+ self.connmanager.close(conn, cursor)
+ raise
+
+ def restart_store(self, cursor):
+ """Reuse a store connection."""
+ try:
+ cursor.connection.rollback()
+ self._make_temp_table(cursor)
+ except self.disconnected_exceptions, e:
+ raise StorageError(e)
+
+ def store_temp(self, cursor, oid, prev_tid, data):
+ """Store an object in the temporary table."""
+ md5sum = compute_md5sum(data)
+ stmt = """
+ DELETE FROM temp_store WHERE zoid = %s;
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (%s, %s, %s, decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
+
+ def replace_temp(self, cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table."""
+ md5sum = compute_md5sum(data)
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ md5 = %s,
+ state = decode(%s, 'base64')
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
+
+ def restore(self, cursor, oid, tid, data):
+ """Store an object directly, without conflict detection.
+
+ Used for copying transactions into this database.
+ """
+ md5sum = compute_md5sum(data)
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (%s, %s,
+ COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
+ %s, decode(%s, 'base64'))
+ """
+ if data is not None:
+ data = encodestring(data)
+ cursor.execute(stmt, (oid, tid, oid, md5sum, data))
+
+ def detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ encode(temp_store.state, 'base64')
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ LIMIT 1
+ """
+ cursor.execute(stmt)
+ if cursor.rowcount:
+ oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
+ return oid, prev_tid, attempted_prev_tid, decodestring(data)
+ return None
+
+ def move_from_temp(self, cursor, tid):
+ """Moved the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, %s, prev_tid, md5, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, (tid,))
+
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
+ def update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ cursor.execute("""
+ -- Insert objects created in this transaction into current_object.
+ INSERT INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = %(tid)s
+ AND prev_tid = 0;
+
+ -- Change existing objects. To avoid deadlocks,
+ -- update in OID order.
+ UPDATE current_object SET tid = %(tid)s
+ WHERE zoid IN (
+ SELECT zoid FROM object_state
+ WHERE tid = %(tid)s
+ AND prev_tid != 0
+ ORDER BY zoid
+ )
+ """, {'tid': tid})
+
+ def set_min_oid(self, cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+ cursor.execute("""
+ SELECT CASE WHEN %s > nextval('zoid_seq')
+ THEN setval('zoid_seq', %s)
+ ELSE 0
+ END
+ """, (oid, oid))
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "SELECT NEXTVAL('zoid_seq')"
+ cursor.execute(stmt)
+ return cursor.fetchone()[0]
+
+
+class HistoryPreservingMySQLLoadStore(object):
+
+ def __init__(self, connmanager, disconnected_exceptions, Binary):
+ self.connmanager = connmanager
+ self.disconnected_exceptions = disconnected_exceptions
+ self.Binary = Binary
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ return self.connmanager.open(
+ self.connmanager.isolation_repeatable_read)
+
+ def restart_load(self, cursor):
+ """Reinitialize a connection for loading objects."""
+ try:
+ cursor.connection.rollback()
+ except self.disconnected_exceptions, e:
+ raise StorageError(e)
+
+ def get_current_tid(self, cursor, oid):
+ """Returns the current integer tid for an object.
+
+ oid is an integer. Returns None if object does not exist.
+ """
+ cursor.execute("""
+ SELECT tid
+ FROM current_object
+ WHERE zoid = %s
+ """, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ return None
+
+ def load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ cursor.execute("""
+ SELECT state, tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = %s
+ """, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+ else:
+ return None, None
+
+ def load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ cursor.execute("""
+ SELECT state
+ FROM object_state
+ WHERE zoid = %s
+ AND tid = %s
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ (state,) = cursor.fetchone()
+ return state
+ return None
+
+ def exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+ return cursor.rowcount
+
+ def load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ cursor.execute("""
+ SELECT state, tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid < %s
+ ORDER BY tid DESC
+ LIMIT 1
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+ else:
+ return None, None
+
+ def get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid > %s
+ ORDER BY tid
+ LIMIT 1
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ else:
+ return None
+
+ def _make_temp_table(self, cursor):
+ """Create the temporary table for storing objects"""
+ stmt = """
+ CREATE TEMPORARY TABLE temp_store (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state LONGBLOB
+ ) ENGINE MyISAM
+ """
+ cursor.execute(stmt)
+
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.connmanager.open()
+ try:
+ self._make_temp_table(cursor)
+ return conn, cursor
+ except:
+ self.connmanager.close(conn, cursor)
+ raise
+
+ def _restart_temp_table(self, cursor):
+ """Restart the temporary table for storing objects"""
+ stmt = """
+ DROP TEMPORARY TABLE IF EXISTS temp_store
+ """
+ cursor.execute(stmt)
+ self._make_temp_table(cursor)
+
+ def restart_store(self, cursor):
+ """Reuse a store connection."""
+ try:
+ cursor.connection.rollback()
+ self._restart_temp_table(cursor)
+ except self.disconnected_exceptions, e:
+ raise StorageError(e)
+
+ def store_temp(self, cursor, oid, prev_tid, data):
+ """Store an object in the temporary table."""
+ md5sum = compute_md5sum(data)
+ stmt = """
+ REPLACE INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (%s, %s, %s, %s)
+ """
+ cursor.execute(stmt, (oid, prev_tid, md5sum, self.Binary(data)))
+
+ def replace_temp(self, cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table."""
+ md5sum = compute_md5sum(data)
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ md5 = %s,
+ state = %s
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (prev_tid, md5sum, self.Binary(data), oid))
+
+ def restore(self, cursor, oid, tid, data):
+ """Store an object directly, without conflict detection.
+
+ Used for copying transactions into this database.
+ """
+ md5sum = compute_md5sum(data)
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (%s, %s,
+ COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
+ %s, %s)
+ """
+ if data is not None:
+ data = self.Binary(data)
+ cursor.execute(stmt, (oid, tid, oid, md5sum, data))
+
+ def detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ # Lock in share mode to ensure the data being read is up to date.
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ temp_store.state
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ LIMIT 1
+ LOCK IN SHARE MODE
+ """
+ cursor.execute(stmt)
+ if cursor.rowcount:
+ return cursor.fetchone()
+ return None
+
+ def move_from_temp(self, cursor, tid):
+ """Moved the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, %s, prev_tid, md5, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, (tid,))
+
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
+ def update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ cursor.execute("""
+ REPLACE INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = %s
+ """, (tid,))
+
+ def set_min_oid(self, cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+ cursor.execute("REPLACE INTO new_oid VALUES(%s)", (oid,))
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "INSERT INTO new_oid VALUES ()"
+ cursor.execute(stmt)
+ oid = cursor.connection.insert_id()
+ if oid % 100 == 0:
+ # Clean out previously generated OIDs.
+ stmt = "DELETE FROM new_oid WHERE zoid < %s"
+ cursor.execute(stmt, (oid,))
+ return oid
+
+
+class HistoryPreservingOracleLoadStore(object):
+
+ def __init__(self, connmanager, runner, disconnected_exceptions,
+ Binary, inputsize_BLOB, inputsize_BINARY, twophase):
+ self.connmanager = connmanager
+ self.runner = runner
+ self.disconnected_exceptions = disconnected_exceptions
+ self.Binary = Binary
+ self.inputsize_BLOB = inputsize_BLOB
+ self.inputsize_BINARY = inputsize_BINARY
+ self.twophase = twophase
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ return self.connmanager.open(self.connmanager.isolation_read_only)
+
+ def restart_load(self, cursor):
+ """Reinitialize a connection for loading objects."""
+ try:
+ cursor.connection.rollback()
+ cursor.execute("SET TRANSACTION READ ONLY")
+ except self.disconnected_exceptions, e:
+ raise StorageError(e)
+
+ def get_current_tid(self, cursor, oid):
+ """Returns the current integer tid for an object.
+
+ oid is an integer. Returns None if object does not exist.
+ """
+ cursor.execute("""
+ SELECT tid
+ FROM current_object
+ WHERE zoid = :1
+ """, (oid,))
+ for (tid,) in cursor:
+ return tid
+ return None
+
+ def load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ stmt = """
+ SELECT state, tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = :1
+ """
+ return self.runner.run_lob_stmt(
+ cursor, stmt, (oid,), default=(None, None))
+
+ def load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ stmt = """
+ SELECT state
+ FROM object_state
+ WHERE zoid = :1
+ AND tid = :2
+ """
+ (state,) = self.runner.run_lob_stmt(
+ cursor, stmt, (oid, tid), default=(None,))
+ return state
+
+ def exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ cursor.execute("SELECT 1 FROM current_object WHERE zoid = :1", (oid,))
+ return len(list(cursor))
+
+ def load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ stmt = """
+ SELECT state, tid
+ FROM object_state
+ WHERE zoid = :oid
+ AND tid = (
+ SELECT MAX(tid)
+ FROM object_state
+ WHERE zoid = :oid
+ AND tid < :tid
+ )
+ """
+ return self.runner.run_lob_stmt(
+ cursor, stmt, {'oid': oid, 'tid': tid}, default=(None, None))
+
+ def get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT MIN(tid)
+ FROM object_state
+ WHERE zoid = :1
+ AND tid > :2
+ """
+ cursor.execute(stmt, (oid, tid))
+ rows = cursor.fetchall()
+ if rows:
+ assert len(rows) == 1
+ return rows[0][0]
+ else:
+ return None
+
+ def _set_xid(self, cursor):
+ """Set up a distributed transaction"""
+ stmt = """
+ SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
+ """
+ cursor.execute(stmt)
+ xid = str(cursor.fetchone()[0])
+ cursor.connection.begin(0, xid, '0')
+
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ if self.twophase:
+ conn, cursor = self.connmanager.open(
+ transaction_mode=None, twophase=True)
+ try:
+ self._set_xid(cursor)
+ except:
+ self.close(conn, cursor)
+ raise
+ else:
+ conn, cursor = self.connmanager.open()
+ return conn, cursor
+
+ def restart_store(self, cursor):
+ """Reuse a store connection."""
+ try:
+ cursor.connection.rollback()
+ if self.twophase:
+ self._set_xid(cursor)
+ except self.disconnected_exceptions, e:
+ raise StorageError(e)
+
+ def store_temp(self, cursor, oid, prev_tid, data):
+ """Store an object in the temporary table."""
+ md5sum = compute_md5sum(data)
+ cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
+ if len(data) <= 2000:
+ # Send data inline for speed. Oracle docs say maximum size
+ # of a RAW is 2000 bytes. inputsize_BINARY corresponds with RAW.
+ cursor.setinputsizes(rawdata=self.inputsize_BINARY)
+ stmt = """
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (:oid, :prev_tid, :md5sum, :rawdata)
+ """
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, rawdata=data)
+ else:
+ # Send data as a BLOB
+ cursor.setinputsizes(blobdata=self.inputsize_BLOB)
+ stmt = """
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (:oid, :prev_tid, :md5sum, :blobdata)
+ """
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, blobdata=data)
+
+ def replace_temp(self, cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table."""
+ md5sum = compute_md5sum(data)
+ cursor.setinputsizes(data=self.inputsize_BLOB)
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = :prev_tid,
+ md5 = :md5sum,
+ state = :data
+ WHERE zoid = :oid
+ """
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, data=self.Binary(data))
+
+ def restore(self, cursor, oid, tid, data):
+ """Store an object directly, without conflict detection.
+
+ Used for copying transactions into this database.
+ """
+ md5sum = compute_md5sum(data)
+ cursor.setinputsizes(data=self.inputsize_BLOB)
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (:oid, :tid,
+ COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
+ :md5sum, :data)
+ """
+ if data is not None:
+ data = self.Binary(data)
+ cursor.execute(stmt, oid=oid, tid=tid, md5sum=md5sum, data=data)
+
+ def detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ temp_store.state
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ """
+ return self.runner.run_lob_stmt(cursor, stmt)
+
+ def move_from_temp(self, cursor, tid):
+ """Move the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, :tid, prev_tid, md5, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, tid=tid)
+
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
+ def update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ # Insert objects created in this transaction into current_object.
+ stmt = """
+ INSERT INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = :1
+ AND prev_tid = 0
+ """
+ cursor.execute(stmt, (tid,))
+
+ # Change existing objects.
+ stmt = """
+ UPDATE current_object SET tid = :1
+ WHERE zoid IN (
+ SELECT zoid FROM object_state
+ WHERE tid = :1
+ AND prev_tid != 0
+ )
+ """
+ cursor.execute(stmt, (tid,))
+
+ def set_min_oid(self, cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+ next_oid = self.new_oid(cursor)
+ if next_oid < oid:
+ # Oracle provides no way modify the sequence value
+ # except through alter sequence or drop/create sequence,
+ # but either statement kills the current transaction.
+ # Therefore, open a temporary connection to make the
+ # alteration.
+ conn2, cursor2 = self.connmanager.open()
+ try:
+ # Change the sequence by altering the increment.
+ # (this is safer than dropping and re-creating the sequence)
+ diff = oid - next_oid
+ cursor2.execute(
+ "ALTER SEQUENCE zoid_seq INCREMENT BY %d" % diff)
+ cursor2.execute("SELECT zoid_seq.nextval FROM DUAL")
+ cursor2.execute("ALTER SEQUENCE zoid_seq INCREMENT BY 1")
+ conn2.commit()
+ finally:
+ self.connmanager.close(conn2, cursor2)
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "SELECT zoid_seq.nextval FROM DUAL"
+ cursor.execute(stmt)
+ return cursor.fetchone()[0]
Added: relstorage/trunk/relstorage/adapters/locker.py
===================================================================
--- relstorage/trunk/relstorage/adapters/locker.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/locker.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,157 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Locker implementations.
+"""
+
+from relstorage.adapters.interfaces import ILocker
+from ZODB.POSException import StorageError
+from zope.interface import implements
+import re
+
+commit_lock_timeout = 30
+
+
+class Locker(object):
+
+ def __init__(self, database_errors):
+ self.database_errors = database_errors
+
+
+class PostgreSQLLocker(Locker):
+ implements(ILocker)
+
+ def hold_commit_lock(self, cursor, ensure_current=False):
+ if ensure_current:
+ # Hold commit_lock to prevent concurrent commits
+ # (for as short a time as possible).
+ # Lock transaction and current_object in share mode to ensure
+ # conflict detection has the most current data.
+ cursor.execute("""
+ LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+ LOCK TABLE transaction IN SHARE MODE;
+ LOCK TABLE current_object IN SHARE MODE
+ """)
+ else:
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
+ def release_commit_lock(self, cursor):
+ # no action needed
+ pass
+
+ def _pg_version(self, cursor):
+ """Return the (major, minor) version of PostgreSQL"""
+ cursor.execute("SELECT version()")
+ v = cursor.fetchone()[0]
+ m = re.search(r"([0-9]+)[.]([0-9]+)", v)
+ if m is None:
+ raise AssertionError("Unable to detect PostgreSQL version: " + v)
+ else:
+ return int(m.group(1)), int(m.group(2))
+
+ def _pg_has_advisory_locks(self, cursor):
+ """Return true if this version of PostgreSQL supports advisory locks"""
+ return self._pg_version(cursor) >= (8, 2)
+
+ def create_pack_lock(self, cursor):
+ if not self._pg_has_advisory_locks(cursor):
+ cursor.execute("CREATE TABLE pack_lock ()")
+
+ def hold_pack_lock(self, cursor):
+ """Try to acquire the pack lock.
+
+ Raise an exception if packing or undo is already in progress.
+ """
+ if self._pg_has_advisory_locks(cursor):
+ cursor.execute("SELECT pg_try_advisory_lock(1)")
+ locked = cursor.fetchone()[0]
+ if not locked:
+ raise StorageError('A pack or undo operation is in progress')
+ else:
+ # b/w compat
+ try:
+ cursor.execute("LOCK pack_lock IN EXCLUSIVE MODE NOWAIT")
+ except self.database_errors: # psycopg2.DatabaseError:
+ raise StorageError('A pack or undo operation is in progress')
+
+ def release_pack_lock(self, cursor):
+ """Release the pack lock."""
+ if self._pg_has_advisory_locks(cursor):
+ cursor.execute("SELECT pg_advisory_unlock(1)")
+ # else no action needed since the lock will be released at txn commit
+
+
+class MySQLLocker(Locker):
+ implements(ILocker)
+
+ def hold_commit_lock(self, cursor, ensure_current=False):
+ cursor.execute("SELECT GET_LOCK(CONCAT(DATABASE(), '.commit'), %s)",
+ (commit_lock_timeout,))
+ locked = cursor.fetchone()[0]
+ if not locked:
+ raise StorageError("Unable to acquire commit lock")
+
+ def release_commit_lock(self, cursor):
+ cursor.execute("SELECT RELEASE_LOCK(CONCAT(DATABASE(), '.commit'))")
+
+ def hold_pack_lock(self, cursor):
+ """Try to acquire the pack lock.
+
+ Raise an exception if packing or undo is already in progress.
+ """
+ stmt = "SELECT GET_LOCK(CONCAT(DATABASE(), '.pack'), 0)"
+ cursor.execute(stmt)
+ res = cursor.fetchone()[0]
+ if not res:
+ raise StorageError('A pack or undo operation is in progress')
+
+ def release_pack_lock(self, cursor):
+ """Release the pack lock."""
+ stmt = "SELECT RELEASE_LOCK(CONCAT(DATABASE(), '.pack'))"
+ cursor.execute(stmt)
+
+
+class OracleLocker(Locker):
+ implements(ILocker)
+
+ def hold_commit_lock(self, cursor, ensure_current=False):
+ # Hold commit_lock to prevent concurrent commits
+ # (for as short a time as possible).
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+ if ensure_current:
+ # Lock transaction and current_object in share mode to ensure
+ # conflict detection has the most current data.
+ cursor.execute("LOCK TABLE transaction IN SHARE MODE")
+ cursor.execute("LOCK TABLE current_object IN SHARE MODE")
+
+ def release_commit_lock(self, cursor):
+ # no action needed
+ pass
+
+ def hold_pack_lock(self, cursor):
+ """Try to acquire the pack lock.
+
+ Raise an exception if packing or undo is already in progress.
+ """
+ stmt = """
+ LOCK TABLE pack_lock IN EXCLUSIVE MODE NOWAIT
+ """
+ try:
+ cursor.execute(stmt)
+ except self.database_errors: # cx_Oracle.DatabaseError:
+ raise StorageError('A pack or undo operation is in progress')
+
+ def release_pack_lock(self, cursor):
+ """Release the pack lock."""
+ # No action needed
+ pass
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -50,232 +50,133 @@
import logging
import MySQLdb
-import time
-from ZODB.POSException import StorageError
-from relstorage.adapters.historypreserving import HistoryPreservingAdapter
+from relstorage.adapters.connmanager import AbstractConnectionManager
+from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
+from relstorage.adapters.loadstore import HistoryPreservingMySQLLoadStore
+from relstorage.adapters.locker import MySQLLocker
+from relstorage.adapters.packundo import HistoryPreservingPackUndo
+from relstorage.adapters.poller import Poller
+from relstorage.adapters.schema import HistoryPreservingMySQLSchema
+from relstorage.adapters.scriptrunner import ScriptRunner
+from relstorage.adapters.stats import MySQLStats
+from relstorage.adapters.txncontrol import MySQLTransactionControl
-log = logging.getLogger("relstorage.adapters.mysql")
+log = logging.getLogger(__name__)
-commit_lock_timeout = 30
-
# disconnected_exceptions contains the exception types that might be
# raised when the connection to the database has been broken.
disconnected_exceptions = (MySQLdb.OperationalError, MySQLdb.InterfaceError)
-# close_exceptions contains the exception types to ignore
-# when the adapter attempts to close a database connection.
-close_exceptions = disconnected_exceptions + (MySQLdb.ProgrammingError,)
-
-class MySQLAdapter(HistoryPreservingAdapter):
+class MySQLAdapter(object):
"""MySQL adapter for RelStorage."""
- _scripts = HistoryPreservingAdapter._scripts.copy()
- # Work around a MySQL performance bug by avoiding an expensive subquery.
- # See: http://mail.zope.org/pipermail/zodb-dev/2008-May/011880.html
- # http://bugs.mysql.com/bug.php?id=28257
- _scripts.update({
- 'create_temp_pack_visit': """
- CREATE TEMPORARY TABLE temp_pack_visit (
- zoid BIGINT NOT NULL,
- keep_tid BIGINT
- );
- CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid);
- CREATE TEMPORARY TABLE temp_pack_child (
- zoid BIGINT NOT NULL
- );
- CREATE UNIQUE INDEX temp_pack_child_zoid ON temp_pack_child (zoid);
- """,
+ keep_history = True
- # Note: UPDATE must be the last statement in the script
- # because it returns a value.
- 'pre_pack_follow_child_refs': """
- %(TRUNCATE)s temp_pack_child;
-
- INSERT INTO temp_pack_child
- SELECT DISTINCT to_zoid
- FROM object_ref
- JOIN temp_pack_visit USING (zoid)
- WHERE object_ref.tid >= temp_pack_visit.keep_tid;
-
- -- MySQL-specific syntax for table join in update
- UPDATE pack_object, temp_pack_child SET keep = %(TRUE)s
- WHERE keep = %(FALSE)s
- AND pack_object.zoid = temp_pack_child.zoid;
- """,
-
- # MySQL optimizes deletion far better when using a join syntax.
- 'pack_current_object': """
- DELETE FROM current_object
- USING current_object
- JOIN pack_state USING (zoid, tid)
- WHERE current_object.tid = %(tid)s
- """,
-
- 'pack_object_state': """
- DELETE FROM object_state
- USING object_state
- JOIN pack_state USING (zoid, tid)
- WHERE object_state.tid = %(tid)s
- """,
-
- 'pack_object_ref': """
- DELETE FROM object_refs_added
- USING object_refs_added
- JOIN transaction USING (tid)
- WHERE transaction.empty = true;
-
- DELETE FROM object_ref
- USING object_ref
- JOIN transaction USING (tid)
- WHERE transaction.empty = true
- """,
- })
-
def __init__(self, **params):
- self._params = params.copy()
+ self.connmanager = MySQLdbConnectionManager(params)
+ self.runner = ScriptRunner()
+ self.locker = MySQLLocker((MySQLdb.DatabaseError,))
+ self.schema = HistoryPreservingMySQLSchema(
+ connmanager=self.connmanager,
+ runner=self.runner,
+ )
+ self.loadstore = HistoryPreservingMySQLLoadStore(
+ connmanager=self.connmanager,
+ disconnected_exceptions=disconnected_exceptions,
+ Binary=MySQLdb.Binary,
+ )
+ self.txncontrol = MySQLTransactionControl(
+ Binary=MySQLdb.Binary,
+ )
+ self.poller = Poller(
+ poll_query="SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1",
+ keep_history=True,
+ runner=self.runner,
+ )
+ self.packundo = HistoryPreservingPackUndo(
+ connmanager=self.connmanager,
+ runner=self.runner,
+ locker=self.locker,
+ )
+ self.dbiter = HistoryPreservingDatabaseIterator(
+ runner=self.runner,
+ )
+ self.stats = MySQLStats(
+ connmanager=self.connmanager,
+ )
- def create_schema(self, cursor):
- """Create the database tables."""
- stmt = """
- -- The list of all transactions in the database
- CREATE TABLE transaction (
- tid BIGINT NOT NULL PRIMARY KEY,
- packed BOOLEAN NOT NULL DEFAULT FALSE,
- empty BOOLEAN NOT NULL DEFAULT FALSE,
- username BLOB NOT NULL,
- description BLOB NOT NULL,
- extension BLOB
- ) ENGINE = InnoDB;
+ self.open = self.connmanager.open
+ self.close = self.connmanager.close
- -- Create a special transaction to represent object creation. This
- -- row is often referenced by object_state.prev_tid, but never by
- -- object_state.tid.
- INSERT INTO transaction (tid, username, description)
- VALUES (0, 'system', 'special transaction for object creation');
+ self.hold_commit_lock = self.locker.hold_commit_lock
+ self.release_commit_lock = self.locker.release_commit_lock
+ self.hold_pack_lock = self.locker.hold_pack_lock
+ self.release_pack_lock = self.locker.release_pack_lock
- -- All OIDs allocated in the database. Note that this table
- -- is purposely non-transactional.
- CREATE TABLE new_oid (
- zoid BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT
- ) ENGINE = MyISAM;
+ self.create_schema = self.schema.create
+ self.prepare_schema = self.schema.prepare
+ self.zap_all = self.schema.zap_all
+ self.drop_all = self.schema.drop_all
- -- All object states in all transactions. Note that md5 and state
- -- can be null to represent object uncreation.
- CREATE TABLE object_state (
- zoid BIGINT NOT NULL,
- tid BIGINT NOT NULL REFERENCES transaction,
- PRIMARY KEY (zoid, tid),
- prev_tid BIGINT NOT NULL REFERENCES transaction,
- md5 CHAR(32) CHARACTER SET ascii,
- state LONGBLOB,
- CHECK (tid > 0)
- ) ENGINE = InnoDB;
- CREATE INDEX object_state_tid ON object_state (tid);
- CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+ self.open_for_load = self.loadstore.open_for_load
+ self.restart_load = self.loadstore.restart_load
+ self.get_current_tid = self.loadstore.get_current_tid
+ self.load_current = self.loadstore.load_current
+ self.load_revision = self.loadstore.load_revision
+ self.exists = self.loadstore.exists
+ self.load_before = self.loadstore.load_before
+ self.get_object_tid_after = self.loadstore.get_object_tid_after
- -- Pointers to the current object state
- CREATE TABLE current_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- tid BIGINT NOT NULL,
- FOREIGN KEY (zoid, tid) REFERENCES object_state (zoid, tid)
- ) ENGINE = InnoDB;
- CREATE INDEX current_object_tid ON current_object (tid);
+ self.open_for_store = self.loadstore.open_for_store
+ self.restart_store = self.loadstore.restart_store
+ self.store_temp = self.loadstore.store_temp
+ self.replace_temp = self.loadstore.replace_temp
+ self.restore = self.loadstore.restore
+ self.detect_conflict = self.loadstore.detect_conflict
+ self.move_from_temp = self.loadstore.move_from_temp
+ self.update_current = self.loadstore.update_current
+ self.set_min_oid = self.loadstore.set_min_oid
+ self.new_oid = self.loadstore.new_oid
- -- A list of referenced OIDs from each object_state.
- -- This table is populated as needed during packing.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because rows in object_state
- -- are never modified once committed, and rows are removed
- -- from object_state only by packing.
- CREATE TABLE object_ref (
- zoid BIGINT NOT NULL,
- tid BIGINT NOT NULL,
- to_zoid BIGINT NOT NULL,
- PRIMARY KEY (tid, zoid, to_zoid)
- ) ENGINE = MyISAM;
+ self.get_tid_and_time = self.txncontrol.get_tid_and_time
+ self.add_transaction = self.txncontrol.add_transaction
+ self.commit_phase1 = self.txncontrol.commit_phase1
+ self.commit_phase2 = self.txncontrol.commit_phase2
+ self.abort = self.txncontrol.abort
- -- The object_refs_added table tracks whether object_refs has
- -- been populated for all states in a given transaction.
- -- An entry is added only when the work is finished.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because object states
- -- are never added to a transaction once committed, and
- -- rows are removed from the transaction table only by
- -- packing.
- CREATE TABLE object_refs_added (
- tid BIGINT NOT NULL PRIMARY KEY
- ) ENGINE = MyISAM;
+ self.poll_invalidations = self.poller.poll_invalidations
- -- Temporary state during packing:
- -- The list of objects to pack. If keep is false,
- -- the object and all its revisions will be removed.
- -- If keep is true, instead of removing the object,
- -- the pack operation will cut the object's history.
- -- The keep_tid field specifies the oldest revision
- -- of the object to keep.
- -- The visited flag is set when pre_pack is visiting an object's
- -- references, and remains set.
- CREATE TABLE pack_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- keep BOOLEAN NOT NULL,
- keep_tid BIGINT NOT NULL,
- visited BOOLEAN NOT NULL DEFAULT FALSE
- ) ENGINE = MyISAM;
- CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+ self.fill_object_refs = self.packundo.fill_object_refs
+ self.open_for_pre_pack = self.packundo.open_for_pre_pack
+ self.choose_pack_transaction = self.packundo.choose_pack_transaction
+ self.pre_pack = self.packundo.pre_pack
+ self.pack = self.packundo.pack
+ self.verify_undoable = self.packundo.verify_undoable
+ self.undo = self.packundo.undo
- -- Temporary state during packing: the list of object states to pack.
- CREATE TABLE pack_state (
- tid BIGINT NOT NULL,
- zoid BIGINT NOT NULL,
- PRIMARY KEY (tid, zoid)
- ) ENGINE = MyISAM;
+ self.iter_objects = self.dbiter.iter_objects
+ self.iter_transactions = self.dbiter.iter_transactions
+ self.iter_transactions_range = self.dbiter.iter_transactions_range
+ self.iter_object_history = self.dbiter.iter_object_history
- -- Temporary state during packing: the list of transactions that
- -- have at least one object state to pack.
- CREATE TABLE pack_state_tid (
- tid BIGINT NOT NULL PRIMARY KEY
- ) ENGINE = MyISAM;
- """
- self._run_script(cursor, stmt)
+ self.get_object_count = self.stats.get_object_count
+ self.get_db_size = self.stats.get_db_size
- def prepare_schema(self):
- """Create the database schema if it does not already exist."""
- def callback(conn, cursor):
- cursor.execute("SHOW TABLES LIKE 'object_state'")
- if not cursor.rowcount:
- self.create_schema(cursor)
- self._open_and_call(callback)
+class MySQLdbConnectionManager(AbstractConnectionManager):
- def zap_all(self):
- """Clear all data out of the database."""
- def callback(conn, cursor):
- stmt = """
- DELETE FROM object_refs_added;
- DELETE FROM object_ref;
- DELETE FROM current_object;
- DELETE FROM object_state;
- TRUNCATE new_oid;
- DELETE FROM transaction;
- -- Create a transaction to represent object creation.
- INSERT INTO transaction (tid, username, description) VALUES
- (0, 'system', 'special transaction for object creation');
- """
- self._run_script(cursor, stmt)
- self._open_and_call(callback)
+ isolation_read_committed = "ISOLATION LEVEL READ COMMITTED"
+ isolation_repeatable_read = "ISOLATION LEVEL REPEATABLE READ"
- def drop_all(self):
- """Drop all tables and sequences."""
- def callback(conn, cursor):
- for tablename in ('pack_state_tid', 'pack_state',
- 'pack_object', 'object_refs_added', 'object_ref',
- 'current_object', 'object_state', 'new_oid',
- 'transaction'):
- cursor.execute("DROP TABLE IF EXISTS %s" % tablename)
- self._open_and_call(callback)
+ # close_exceptions contains the exception types to ignore
+ # when the adapter attempts to close a database connection.
+ close_exceptions = disconnected_exceptions + (MySQLdb.ProgrammingError,)
+ def __init__(self, params):
+ self._params = params.copy()
+
def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED"):
"""Open a database connection and return (conn, cursor)."""
try:
@@ -291,396 +192,3 @@
log.warning("Unable to connect: %s", e)
raise
- def close(self, conn, cursor):
- """Close a connection and cursor, ignoring certain errors.
- """
- for obj in (cursor, conn):
- if obj is not None:
- try:
- obj.close()
- except close_exceptions:
- pass
-
- def open_for_load(self):
- """Open and initialize a connection for loading objects.
-
- Returns (conn, cursor).
- """
- return self.open("ISOLATION LEVEL REPEATABLE READ")
-
- def restart_load(self, cursor):
- """Reinitialize a connection for loading objects."""
- try:
- cursor.connection.rollback()
- except disconnected_exceptions, e:
- raise StorageError(e)
-
- def get_object_count(self):
- """Returns the number of objects in the database"""
- # do later
- return 0
-
- def get_db_size(self):
- """Returns the approximate size of the database in bytes"""
- conn, cursor = self.open()
- try:
- cursor.execute("SHOW TABLE STATUS")
- description = [i[0] for i in cursor.description]
- rows = list(cursor)
- finally:
- self.close(conn, cursor)
- data_column = description.index('Data_length')
- index_column = description.index('Index_length')
- return sum([row[data_column] + row[index_column] for row in rows], 0)
-
- def get_current_tid(self, cursor, oid):
- """Returns the current integer tid for an object.
-
- oid is an integer. Returns None if object does not exist.
- """
- cursor.execute("""
- SELECT tid
- FROM current_object
- WHERE zoid = %s
- """, (oid,))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
- return None
-
- def load_current(self, cursor, oid):
- """Returns the current pickle and integer tid for an object.
-
- oid is an integer. Returns (None, None) if object does not exist.
- """
- cursor.execute("""
- SELECT state, tid
- FROM current_object
- JOIN object_state USING(zoid, tid)
- WHERE zoid = %s
- """, (oid,))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()
- else:
- return None, None
-
- def load_revision(self, cursor, oid, tid):
- """Returns the pickle for an object on a particular transaction.
-
- Returns None if no such state exists.
- """
- cursor.execute("""
- SELECT state
- FROM object_state
- WHERE zoid = %s
- AND tid = %s
- """, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- (state,) = cursor.fetchone()
- return state
- return None
-
- def exists(self, cursor, oid):
- """Returns a true value if the given object exists."""
- cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
- return cursor.rowcount
-
- def load_before(self, cursor, oid, tid):
- """Returns the pickle and tid of an object before transaction tid.
-
- Returns (None, None) if no earlier state exists.
- """
- cursor.execute("""
- SELECT state, tid
- FROM object_state
- WHERE zoid = %s
- AND tid < %s
- ORDER BY tid DESC
- LIMIT 1
- """, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()
- else:
- return None, None
-
- def get_object_tid_after(self, cursor, oid, tid):
- """Returns the tid of the next change after an object revision.
-
- Returns None if no later state exists.
- """
- stmt = """
- SELECT tid
- FROM object_state
- WHERE zoid = %s
- AND tid > %s
- ORDER BY tid
- LIMIT 1
- """
- cursor.execute(stmt, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
- else:
- return None
-
- def _make_temp_table(self, cursor):
- """Create the temporary table for storing objects"""
- stmt = """
- CREATE TEMPORARY TABLE temp_store (
- zoid BIGINT NOT NULL PRIMARY KEY,
- prev_tid BIGINT NOT NULL,
- md5 CHAR(32),
- state LONGBLOB
- ) ENGINE MyISAM
- """
- cursor.execute(stmt)
-
- def open_for_store(self):
- """Open and initialize a connection for storing objects.
-
- Returns (conn, cursor).
- """
- conn, cursor = self.open()
- try:
- self._make_temp_table(cursor)
- return conn, cursor
- except:
- self.close(conn, cursor)
- raise
-
- def _restart_temp_table(self, cursor):
- """Restart the temporary table for storing objects"""
- stmt = """
- DROP TEMPORARY TABLE IF EXISTS temp_store
- """
- cursor.execute(stmt)
- self._make_temp_table(cursor)
-
- def restart_store(self, cursor):
- """Reuse a store connection."""
- try:
- cursor.connection.rollback()
- self._restart_temp_table(cursor)
- except disconnected_exceptions, e:
- raise StorageError(e)
-
- def store_temp(self, cursor, oid, prev_tid, data):
- """Store an object in the temporary table."""
- md5sum = self.md5sum(data)
- stmt = """
- REPLACE INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (%s, %s, %s, %s)
- """
- cursor.execute(stmt, (oid, prev_tid, md5sum, MySQLdb.Binary(data)))
-
- def replace_temp(self, cursor, oid, prev_tid, data):
- """Replace an object in the temporary table."""
- md5sum = self.md5sum(data)
- stmt = """
- UPDATE temp_store SET
- prev_tid = %s,
- md5 = %s,
- state = %s
- WHERE zoid = %s
- """
- cursor.execute(stmt, (prev_tid, md5sum, MySQLdb.Binary(data), oid))
-
- def restore(self, cursor, oid, tid, data):
- """Store an object directly, without conflict detection.
-
- Used for copying transactions into this database.
- """
- md5sum = self.md5sum(data)
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (%s, %s,
- COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
- %s, %s)
- """
- if data is not None:
- data = MySQLdb.Binary(data)
- cursor.execute(stmt, (oid, tid, oid, md5sum, data))
-
- def start_commit(self, cursor):
- """Prepare to commit."""
- self._hold_commit_lock(cursor)
-
- def get_tid_and_time(self, cursor):
- """Returns the most recent tid and the current database time.
-
- The database time is the number of seconds since the epoch.
- """
- # Lock in share mode to ensure the data being read is up to date.
- cursor.execute("""
- SELECT tid, UNIX_TIMESTAMP()
- FROM transaction
- ORDER BY tid DESC
- LIMIT 1
- LOCK IN SHARE MODE
- """)
- assert cursor.rowcount == 1
- tid, timestamp = cursor.fetchone()
- # MySQL does not provide timestamps with more than one second
- # precision. To provide more precision, if the system time is
- # within one minute of the MySQL time, use the system time instead.
- now = time.time()
- if abs(now - timestamp) <= 60.0:
- timestamp = now
- return tid, timestamp
-
- def add_transaction(self, cursor, tid, username, description, extension,
- packed=False):
- """Add a transaction."""
- stmt = """
- INSERT INTO transaction
- (tid, packed, username, description, extension)
- VALUES (%s, %s, %s, %s, %s)
- """
- cursor.execute(stmt, (
- tid, packed, MySQLdb.Binary(username),
- MySQLdb.Binary(description), MySQLdb.Binary(extension)))
-
- def detect_conflict(self, cursor):
- """Find one conflict in the data about to be committed.
-
- If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
- attempted_data). If there is no conflict, returns None.
- """
- # Lock in share mode to ensure the data being read is up to date.
- stmt = """
- SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
- temp_store.state
- FROM temp_store
- JOIN current_object ON (temp_store.zoid = current_object.zoid)
- WHERE temp_store.prev_tid != current_object.tid
- LIMIT 1
- LOCK IN SHARE MODE
- """
- cursor.execute(stmt)
- if cursor.rowcount:
- return cursor.fetchone()
- return None
-
- def move_from_temp(self, cursor, tid):
- """Moved the temporarily stored objects to permanent storage.
-
- Returns the list of oids stored.
- """
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT zoid, %s, prev_tid, md5, state
- FROM temp_store
- """
- cursor.execute(stmt, (tid,))
-
- stmt = """
- SELECT zoid FROM temp_store
- """
- cursor.execute(stmt)
- return [oid for (oid,) in cursor]
-
- def update_current(self, cursor, tid):
- """Update the current object pointers.
-
- tid is the integer tid of the transaction being committed.
- """
- cursor.execute("""
- REPLACE INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
- WHERE tid = %s
- """, (tid,))
-
- def set_min_oid(self, cursor, oid):
- """Ensure the next OID is at least the given OID."""
- cursor.execute("REPLACE INTO new_oid VALUES(%s)", (oid,))
-
- def commit_phase1(self, cursor, tid):
- """Begin a commit. Returns the transaction name.
-
- This method should guarantee that commit_phase2() will succeed,
- meaning that if commit_phase2() would raise any error, the error
- should be raised in commit_phase1() instead.
- """
- return '-'
-
- def commit_phase2(self, cursor, txn):
- """Final transaction commit."""
- cursor.connection.commit()
- self._release_commit_lock(cursor)
-
- def abort(self, cursor, txn=None):
- """Abort the commit. If txn is not None, phase 1 is also aborted."""
- cursor.connection.rollback()
- self._release_commit_lock(cursor)
-
- def new_oid(self, cursor):
- """Return a new, unused OID."""
- stmt = "INSERT INTO new_oid VALUES ()"
- cursor.execute(stmt)
- oid = cursor.connection.insert_id()
- if oid % 100 == 0:
- # Clean out previously generated OIDs.
- stmt = "DELETE FROM new_oid WHERE zoid < %s"
- cursor.execute(stmt, (oid,))
- return oid
-
-
- def hold_pack_lock(self, cursor):
- """Try to acquire the pack lock.
-
- Raise an exception if packing or undo is already in progress.
- """
- stmt = "SELECT GET_LOCK(CONCAT(DATABASE(), '.pack'), 0)"
- cursor.execute(stmt)
- res = cursor.fetchone()[0]
- if not res:
- raise StorageError('A pack or undo operation is in progress')
-
-
- def release_pack_lock(self, cursor):
- """Release the pack lock."""
- stmt = "SELECT RELEASE_LOCK(CONCAT(DATABASE(), '.pack'))"
- cursor.execute(stmt)
-
-
- def open_for_pre_pack(self):
- """Open a connection to be used for the pre-pack phase.
- Returns (conn, cursor).
-
- This overrides a method.
- """
- conn, cursor = self.open(transaction_mode=None)
- try:
- # This phase of packing works best with transactions
- # disabled. It changes no user-facing data.
- conn.autocommit(True)
- return conn, cursor
- except:
- self.close(conn, cursor)
- raise
-
-
- def _hold_commit_lock(self, cursor):
- """Hold the commit lock.
-
- This overrides a method.
- """
- cursor.execute("SELECT GET_LOCK(CONCAT(DATABASE(), '.commit'), %s)",
- (commit_lock_timeout,))
- locked = cursor.fetchone()[0]
- if not locked:
- raise StorageError("Unable to acquire commit lock")
-
-
- def _release_commit_lock(self, cursor):
- """Release the commit lock.
-
- This overrides a method.
- """
- cursor.execute("SELECT RELEASE_LOCK(CONCAT(DATABASE(), '.commit'))")
-
-
- _poll_query = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -14,81 +14,32 @@
"""Oracle adapter for RelStorage."""
import logging
-import re
-import time
-
import cx_Oracle
-from ZODB.POSException import StorageError
-from relstorage.adapters.historypreserving import HistoryPreservingAdapter
+from relstorage.adapters.connmanager import AbstractConnectionManager
+from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
+from relstorage.adapters.loadstore import HistoryPreservingOracleLoadStore
+from relstorage.adapters.locker import OracleLocker
+from relstorage.adapters.packundo import OracleHistoryPreservingPackUndo
+from relstorage.adapters.poller import Poller
+from relstorage.adapters.schema import HistoryPreservingOracleSchema
+from relstorage.adapters.scriptrunner import OracleScriptRunner
+from relstorage.adapters.stats import OracleStats
+from relstorage.adapters.txncontrol import OracleTransactionControl
-log = logging.getLogger("relstorage.adapters.oracle")
+log = logging.getLogger(__name__)
# disconnected_exceptions contains the exception types that might be
# raised when the connection to the database has been broken.
disconnected_exceptions = (cx_Oracle.OperationalError,
cx_Oracle.InterfaceError, cx_Oracle.DatabaseError)
-def lob_handler(cursor, name, defaultType, size, precision, scale):
- """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
- Note that if a BLOB in the result is too large, Oracle generates an
- error indicating truncation. The execute_lob_stmt() method works
- around this.
- """
- if defaultType == cx_Oracle.BLOB:
- # Default size for BLOB is 4, we want the whole blob inline.
- # Typical chunk size is 8132, we choose a multiple - 32528
- return cursor.var(cx_Oracle.LONG_BINARY, 32528, cursor.arraysize)
-
-def read_lob(value):
- """Handle an Oracle LOB by returning its byte stream.
-
- Returns other objects unchanged.
- """
- if isinstance(value, cx_Oracle.LOB):
- return value.read()
- return value
-
-
-class OracleAdapter(HistoryPreservingAdapter):
+class OracleAdapter(object):
"""Oracle adapter for RelStorage."""
- _script_vars = {
- 'TRUE': "'Y'",
- 'FALSE': "'N'",
- 'OCTET_LENGTH': 'LENGTH',
- 'TRUNCATE': 'TRUNCATE TABLE',
- 'oid': ':oid',
- 'tid': ':tid',
- 'pack_tid': ':pack_tid',
- 'undo_tid': ':undo_tid',
- 'self_tid': ':self_tid',
- 'min_tid': ':min_tid',
- 'max_tid': ':max_tid',
- }
+ keep_history = True
- _scripts = HistoryPreservingAdapter._scripts.copy()
- _scripts.update({
- 'choose_pack_transaction': """
- SELECT MAX(tid)
- FROM transaction
- WHERE tid > 0
- AND tid <= %(tid)s
- AND packed = 'N'
- """,
-
- 'create_temp_pack_visit': None,
- 'create_temp_undo': None,
- 'reset_temp_undo': "DELETE FROM temp_undo",
-
- 'transaction_has_data': """
- SELECT DISTINCT tid
- FROM object_state
- WHERE tid = %(tid)s
- """,
- })
-
def __init__(self, user, password, dsn, twophase=False, arraysize=64,
use_inline_lobs=None):
"""Create an Oracle adapter.
@@ -107,322 +58,139 @@
queries. It depends on features in cx_Oracle 5. The default is None,
telling the adapter to auto-detect the presence of cx_Oracle 5.
"""
- self._params = (user, password, dsn)
- self._twophase = bool(twophase)
- self._arraysize = arraysize
+ params = (user, password, dsn)
if use_inline_lobs is None:
use_inline_lobs = (cx_Oracle.version >= '5.0')
- self._use_inline_lobs = bool(use_inline_lobs)
- def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
- """Execute a statement from a script with the given parameters.
+ self.connmanager = CXOracleConnectionManager(params, arraysize)
+ self.runner = CXOracleScriptRunner(bool(use_inline_lobs))
+ self.locker = OracleLocker((cx_Oracle.DatabaseError,))
+ self.schema = HistoryPreservingOracleSchema(
+ connmanager=self.connmanager,
+ runner=self.runner,
+ )
+ self.loadstore = HistoryPreservingOracleLoadStore(
+ connmanager=self.connmanager,
+ runner=self.runner,
+ disconnected_exceptions=disconnected_exceptions,
+ Binary=cx_Oracle.Binary,
+ inputsize_BLOB=cx_Oracle.BLOB,
+ inputsize_BINARY=cx_Oracle.BINARY,
+ twophase=bool(twophase),
+ )
+ self.txncontrol = OracleTransactionControl(
+ Binary=cx_Oracle.Binary,
+ )
+ self.poller = Poller(
+ poll_query="SELECT MAX(tid) FROM transaction",
+ keep_history=True,
+ runner=self.runner,
+ )
+ self.packundo = OracleHistoryPreservingPackUndo(
+ connmanager=self.connmanager,
+ runner=self.runner,
+ locker=self.locker,
+ )
+ self.dbiter = HistoryPreservingDatabaseIterator(
+ runner=self.runner,
+ )
+ self.stats = OracleStats(
+ connmanager=self.connmanager,
+ )
- params should be either an empty tuple (no parameters) or
- a map.
+ self.open = self.connmanager.open
+ self.close = self.connmanager.close
- This overrides a method.
- """
- if generic_params:
- # Oracle raises ORA-01036 if the parameter map contains extra keys,
- # so filter out any unused parameters.
- tracker = TrackingMap(self._script_vars)
- stmt = generic_stmt % tracker
- used = tracker.used
- params = {}
- for k, v in generic_params.iteritems():
- if k in used:
- params[k] = v
- else:
- stmt = generic_stmt % self._script_vars
- params = ()
+ self.hold_commit_lock = self.locker.hold_commit_lock
+ self.release_commit_lock = self.locker.release_commit_lock
+ self.hold_pack_lock = self.locker.hold_pack_lock
+ self.release_pack_lock = self.locker.release_pack_lock
- try:
- cursor.execute(stmt, params)
- except:
- log.warning("script statement failed: %r; parameters: %r",
- stmt, params)
- raise
+ self.create_schema = self.schema.create
+ self.prepare_schema = self.schema.prepare
+ self.zap_all = self.schema.zap_all
+ self.drop_all = self.schema.drop_all
- def _run_many(self, cursor, stmt, items):
- """Execute a statement repeatedly. Items should be a list of tuples.
+ self.open_for_load = self.loadstore.open_for_load
+ self.restart_load = self.loadstore.restart_load
+ self.get_current_tid = self.loadstore.get_current_tid
+ self.load_current = self.loadstore.load_current
+ self.load_revision = self.loadstore.load_revision
+ self.exists = self.loadstore.exists
+ self.load_before = self.loadstore.load_before
+ self.get_object_tid_after = self.loadstore.get_object_tid_after
- stmt should use '%s' parameter format. Overrides a method.
- """
- # replace '%s' with ':n'
- matches = []
- def replace(match):
- matches.append(None)
- return ':%d' % len(matches)
- stmt = re.sub('%s', replace, stmt)
+ self.open_for_store = self.loadstore.open_for_store
+ self.restart_store = self.loadstore.restart_store
+ self.store_temp = self.loadstore.store_temp
+ self.replace_temp = self.loadstore.replace_temp
+ self.restore = self.loadstore.restore
+ self.detect_conflict = self.loadstore.detect_conflict
+ self.move_from_temp = self.loadstore.move_from_temp
+ self.update_current = self.loadstore.update_current
+ self.set_min_oid = self.loadstore.set_min_oid
+ self.new_oid = self.loadstore.new_oid
- cursor.executemany(stmt, items)
+ self.get_tid_and_time = self.txncontrol.get_tid_and_time
+ self.add_transaction = self.txncontrol.add_transaction
+ self.commit_phase1 = self.txncontrol.commit_phase1
+ self.commit_phase2 = self.txncontrol.commit_phase2
+ self.abort = self.txncontrol.abort
- def create_schema(self, cursor):
- """Create the database tables."""
- stmt = """
- CREATE TABLE commit_lock (dummy CHAR);
+ self.poll_invalidations = self.poller.poll_invalidations
- -- The list of all transactions in the database
- CREATE TABLE transaction (
- tid NUMBER(20) NOT NULL PRIMARY KEY,
- packed CHAR DEFAULT 'N' CHECK (packed IN ('N', 'Y')),
- empty CHAR DEFAULT 'N' CHECK (empty IN ('N', 'Y')),
- username RAW(500),
- description RAW(2000),
- extension RAW(2000)
- );
+ self.fill_object_refs = self.packundo.fill_object_refs
+ self.open_for_pre_pack = self.packundo.open_for_pre_pack
+ self.choose_pack_transaction = self.packundo.choose_pack_transaction
+ self.pre_pack = self.packundo.pre_pack
+ self.pack = self.packundo.pack
+ self.verify_undoable = self.packundo.verify_undoable
+ self.undo = self.packundo.undo
- -- Create a special transaction to represent object creation. This
- -- row is often referenced by object_state.prev_tid, but never by
- -- object_state.tid.
- INSERT INTO transaction (tid, username, description)
- VALUES (0,
- UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
- UTL_I18N.STRING_TO_RAW(
- 'special transaction for object creation', 'US7ASCII'));
+ self.iter_objects = self.dbiter.iter_objects
+ self.iter_transactions = self.dbiter.iter_transactions
+ self.iter_transactions_range = self.dbiter.iter_transactions_range
+ self.iter_object_history = self.dbiter.iter_object_history
- CREATE SEQUENCE zoid_seq;
+ self.get_object_count = self.stats.get_object_count
+ self.get_db_size = self.stats.get_db_size
- -- All object states in all transactions.
- -- md5 and state can be null to represent object uncreation.
- CREATE TABLE object_state (
- zoid NUMBER(20) NOT NULL,
- tid NUMBER(20) NOT NULL REFERENCES transaction
- CHECK (tid > 0),
- PRIMARY KEY (zoid, tid),
- prev_tid NUMBER(20) NOT NULL REFERENCES transaction,
- md5 CHAR(32),
- state BLOB
- );
- CREATE INDEX object_state_tid ON object_state (tid);
- CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
- -- Pointers to the current object state
- CREATE TABLE current_object (
- zoid NUMBER(20) NOT NULL PRIMARY KEY,
- tid NUMBER(20) NOT NULL,
- FOREIGN KEY (zoid, tid) REFERENCES object_state
- );
- CREATE INDEX current_object_tid ON current_object (tid);
+class CXOracleScriptRunner(OracleScriptRunner):
- -- States that will soon be stored
- CREATE GLOBAL TEMPORARY TABLE temp_store (
- zoid NUMBER(20) NOT NULL PRIMARY KEY,
- prev_tid NUMBER(20) NOT NULL,
- md5 CHAR(32),
- state BLOB
- ) ON COMMIT DELETE ROWS;
+ def __init__(self, use_inline_lobs):
+ self.use_inline_lobs = use_inline_lobs
- -- During packing, an exclusive lock is held on pack_lock.
- CREATE TABLE pack_lock (dummy CHAR);
+ def _outputtypehandler(self,
+ cursor, name, defaultType, size, precision, scale):
+ """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
- -- A list of referenced OIDs from each object_state.
- -- This table is populated as needed during packing.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because rows in object_state
- -- are never modified once committed, and rows are removed
- -- from object_state only by packing.
- CREATE TABLE object_ref (
- zoid NUMBER(20) NOT NULL,
- tid NUMBER(20) NOT NULL,
- to_zoid NUMBER(20) NOT NULL,
- PRIMARY KEY (tid, zoid, to_zoid)
- );
-
- -- The object_refs_added table tracks whether object_refs has
- -- been populated for all states in a given transaction.
- -- An entry is added only when the work is finished.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because object states
- -- are never added to a transaction once committed, and
- -- rows are removed from the transaction table only by
- -- packing.
- CREATE TABLE object_refs_added (
- tid NUMBER(20) NOT NULL PRIMARY KEY
- );
-
- -- Temporary state during packing:
- -- The list of objects to pack. If keep is 'N',
- -- the object and all its revisions will be removed.
- -- If keep is 'Y', instead of removing the object,
- -- the pack operation will cut the object's history.
- -- The keep_tid field specifies the oldest revision
- -- of the object to keep.
- -- The visited flag is set when pre_pack is visiting an object's
- -- references, and remains set.
- CREATE TABLE pack_object (
- zoid NUMBER(20) NOT NULL PRIMARY KEY,
- keep CHAR NOT NULL CHECK (keep IN ('N', 'Y')),
- keep_tid NUMBER(20) NOT NULL,
- visited CHAR DEFAULT 'N' NOT NULL CHECK (visited IN ('N', 'Y'))
- );
- CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
-
- -- Temporary state during packing: the list of object states to pack.
- CREATE TABLE pack_state (
- tid NUMBER(20) NOT NULL,
- zoid NUMBER(20) NOT NULL,
- PRIMARY KEY (tid, zoid)
- );
-
- -- Temporary state during packing: the list of transactions that
- -- have at least one object state to pack.
- CREATE TABLE pack_state_tid (
- tid NUMBER(20) NOT NULL PRIMARY KEY
- );
-
- -- Temporary state during packing: a list of objects
- -- whose references need to be examined.
- CREATE GLOBAL TEMPORARY TABLE temp_pack_visit (
- zoid NUMBER(20) NOT NULL PRIMARY KEY,
- keep_tid NUMBER(20)
- );
-
- -- Temporary state during undo: a list of objects
- -- to be undone and the tid of the undone state.
- CREATE GLOBAL TEMPORARY TABLE temp_undo (
- zoid NUMBER(20) NOT NULL PRIMARY KEY,
- prev_tid NUMBER(20) NOT NULL
- );
+ Note that if a BLOB in the result is too large, Oracle generates an
+ error indicating truncation. The run_lob_stmt() method works
+ around this.
"""
- self._run_script(cursor, stmt)
- # Let Oracle catch up with the new data definitions by sleeping.
- # This reduces the likelihood of spurious ORA-01466 errors.
- time.sleep(5)
+ if defaultType == cx_Oracle.BLOB:
+ # Default size for BLOB is 4, we want the whole blob inline.
+ # Typical chunk size is 8132, we choose a multiple - 32528
+ return cursor.var(cx_Oracle.LONG_BINARY, 32528, cursor.arraysize)
+ def _read_lob(self, value):
+ """Handle an Oracle LOB by returning its byte stream.
- def prepare_schema(self):
- """Create the database schema if it does not already exist."""
- def callback(conn, cursor):
- cursor.execute("""
- SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
- """)
- if not cursor.fetchall():
- self.create_schema(cursor)
- self._open_and_call(callback)
-
- def zap_all(self):
- """Clear all data out of the database."""
- def callback(conn, cursor):
- stmt = """
- DELETE FROM object_refs_added;
- DELETE FROM object_ref;
- DELETE FROM current_object;
- DELETE FROM object_state;
- DELETE FROM transaction;
- -- Create a transaction to represent object creation.
- INSERT INTO transaction (tid, username, description) VALUES
- (0, UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
- UTL_I18N.STRING_TO_RAW(
- 'special transaction for object creation', 'US7ASCII'));
- DROP SEQUENCE zoid_seq;
- CREATE SEQUENCE zoid_seq;
- """
- self._run_script(cursor, stmt)
- self._open_and_call(callback)
-
- def drop_all(self):
- """Drop all tables and sequences."""
- def callback(conn, cursor):
- for tablename in ('pack_state_tid', 'pack_state',
- 'pack_object', 'object_refs_added', 'object_ref',
- 'current_object', 'object_state', 'transaction',
- 'commit_lock', 'pack_lock',
- 'temp_store', 'temp_undo', 'temp_pack_visit'):
- cursor.execute("DROP TABLE %s" % tablename)
- cursor.execute("DROP SEQUENCE zoid_seq")
- self._open_and_call(callback)
-
- def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
- twophase=False):
- """Open a database connection and return (conn, cursor)."""
- try:
- kw = {'twophase': twophase} #, 'threaded': True}
- conn = cx_Oracle.connect(*self._params, **kw)
- cursor = conn.cursor()
- cursor.arraysize = self._arraysize
- if transaction_mode:
- cursor.execute("SET TRANSACTION %s" % transaction_mode)
- return conn, cursor
-
- except cx_Oracle.OperationalError, e:
- log.warning("Unable to connect: %s", e)
- raise
-
- def close(self, conn, cursor):
- """Close both a cursor and connection, ignoring certain errors."""
- for obj in (cursor, conn):
- if obj is not None:
- try:
- obj.close()
- except disconnected_exceptions:
- pass
-
- def open_for_load(self):
- """Open and initialize a connection for loading objects.
-
- Returns (conn, cursor).
+ Returns other objects unchanged.
"""
- return self.open('READ ONLY')
+ if isinstance(value, cx_Oracle.LOB):
+ return value.read()
+ return value
- def restart_load(self, cursor):
- """Reinitialize a connection for loading objects."""
- try:
- cursor.connection.rollback()
- cursor.execute("SET TRANSACTION READ ONLY")
- except disconnected_exceptions, e:
- raise StorageError(e)
-
- def get_object_count(self):
- """Returns the number of objects in the database"""
- # The tests expect an exact number, but the code below generates
- # an estimate, so this is disabled for now.
- if True:
- return 0
- else:
- conn, cursor = self.open('READ ONLY')
- try:
- cursor.execute("""
- SELECT NUM_ROWS
- FROM USER_TABLES
- WHERE TABLE_NAME = 'CURRENT_OBJECT'
- """)
- res = cursor.fetchone()[0]
- if res is None:
- res = 0
- else:
- res = int(res)
- return res
- finally:
- self.close(conn, cursor)
-
- def get_db_size(self):
- """Returns the approximate size of the database in bytes"""
- # May not be possible without access to the dba_* objects
- return 0
-
- def get_current_tid(self, cursor, oid):
- """Returns the current integer tid for an object.
-
- oid is an integer. Returns None if object does not exist.
- """
- cursor.execute("""
- SELECT tid
- FROM current_object
- WHERE zoid = :1
- """, (oid,))
- for (tid,) in cursor:
- return tid
- return None
-
- def execute_lob_stmt(self, cursor, stmt, args=(), default=None):
+ def run_lob_stmt(self, cursor, stmt, args=(), default=None):
"""Execute a statement and return one row with all LOBs inline.
Returns the value of the default parameter if the result was empty.
"""
- if self._use_inline_lobs:
+ if self.use_inline_lobs:
try:
- cursor.outputtypehandler = lob_handler
+ cursor.outputtypehandler = self._outputtypehandler
try:
cursor.execute(stmt, args)
for row in cursor:
@@ -441,357 +209,37 @@
# with different output type parameters.
cursor.execute(stmt + ' ', args)
for row in cursor:
- return tuple(map(read_lob, row))
+ return tuple(map(self._read_lob, row))
else:
cursor.execute(stmt, args)
for row in cursor:
- return tuple(map(read_lob, row))
+ return tuple(map(self._read_lob, row))
return default
- def load_current(self, cursor, oid):
- """Returns the current pickle and integer tid for an object.
- oid is an integer. Returns (None, None) if object does not exist.
- """
- stmt = """
- SELECT state, tid
- FROM current_object
- JOIN object_state USING(zoid, tid)
- WHERE zoid = :1
- """
- return self.execute_lob_stmt(
- cursor, stmt, (oid,), default=(None, None))
+class CXOracleConnectionManager(AbstractConnectionManager):
- def load_revision(self, cursor, oid, tid):
- """Returns the pickle for an object on a particular transaction.
+ isolation_read_committed = "ISOLATION LEVEL READ COMMITTED"
+ isolation_read_only = "READ ONLY"
- Returns None if no such state exists.
- """
- stmt = """
- SELECT state
- FROM object_state
- WHERE zoid = :1
- AND tid = :2
- """
- (state,) = self.execute_lob_stmt(
- cursor, stmt, (oid, tid), default=(None,))
- return state
+ close_exceptions = disconnected_exceptions
- def exists(self, cursor, oid):
- """Returns a true value if the given object exists."""
- cursor.execute("SELECT 1 FROM current_object WHERE zoid = :1", (oid,))
- return len(list(cursor))
+ def __init__(self, params, arraysize):
+ self._params = params
+ self._arraysize = arraysize
- def load_before(self, cursor, oid, tid):
- """Returns the pickle and tid of an object before transaction tid.
-
- Returns (None, None) if no earlier state exists.
- """
- stmt = """
- SELECT state, tid
- FROM object_state
- WHERE zoid = :oid
- AND tid = (
- SELECT MAX(tid)
- FROM object_state
- WHERE zoid = :oid
- AND tid < :tid
- )
- """
- return self.execute_lob_stmt(cursor, stmt, {'oid': oid, 'tid': tid},
- default=(None, None))
-
- def get_object_tid_after(self, cursor, oid, tid):
- """Returns the tid of the next change after an object revision.
-
- Returns None if no later state exists.
- """
- stmt = """
- SELECT MIN(tid)
- FROM object_state
- WHERE zoid = :1
- AND tid > :2
- """
- cursor.execute(stmt, (oid, tid))
- rows = cursor.fetchall()
- if rows:
- assert len(rows) == 1
- return rows[0][0]
- else:
- return None
-
- def _set_xid(self, cursor):
- """Set up a distributed transaction"""
- stmt = """
- SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
- """
- cursor.execute(stmt)
- xid = str(cursor.fetchone()[0])
- cursor.connection.begin(0, xid, '0')
-
- def open_for_store(self):
- """Open and initialize a connection for storing objects.
-
- Returns (conn, cursor).
- """
- if self._twophase:
- conn, cursor = self.open(transaction_mode=None, twophase=True)
- try:
- self._set_xid(cursor)
- except:
- self.close(conn, cursor)
- raise
- else:
- conn, cursor = self.open()
- return conn, cursor
-
- def restart_store(self, cursor):
- """Reuse a store connection."""
+ def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
+ twophase=False):
+ """Open a database connection and return (conn, cursor)."""
try:
- cursor.connection.rollback()
- if self._twophase:
- self._set_xid(cursor)
- except disconnected_exceptions, e:
- raise StorageError(e)
+ kw = {'twophase': twophase} #, 'threaded': True}
+ conn = cx_Oracle.connect(*self._params, **kw)
+ cursor = conn.cursor()
+ cursor.arraysize = self._arraysize
+ if transaction_mode:
+ cursor.execute("SET TRANSACTION %s" % transaction_mode)
+ return conn, cursor
- def store_temp(self, cursor, oid, prev_tid, data):
- """Store an object in the temporary table."""
- md5sum = self.md5sum(data)
- cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
- if len(data) <= 2000:
- # Send data inline for speed. Oracle docs say maximum size
- # of a RAW is 2000 bytes. cx_Oracle.BINARY corresponds with RAW.
- cursor.setinputsizes(rawdata=cx_Oracle.BINARY)
- stmt = """
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (:oid, :prev_tid, :md5sum, :rawdata)
- """
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, rawdata=data)
- else:
- # Send data as a BLOB
- cursor.setinputsizes(blobdata=cx_Oracle.BLOB)
- stmt = """
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (:oid, :prev_tid, :md5sum, :blobdata)
- """
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, blobdata=data)
-
- def replace_temp(self, cursor, oid, prev_tid, data):
- """Replace an object in the temporary table."""
- md5sum = self.md5sum(data)
- cursor.setinputsizes(data=cx_Oracle.BLOB)
- stmt = """
- UPDATE temp_store SET
- prev_tid = :prev_tid,
- md5 = :md5sum,
- state = :data
- WHERE zoid = :oid
- """
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, data=cx_Oracle.Binary(data))
-
- def restore(self, cursor, oid, tid, data):
- """Store an object directly, without conflict detection.
-
- Used for copying transactions into this database.
- """
- md5sum = self.md5sum(data)
- cursor.setinputsizes(data=cx_Oracle.BLOB)
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (:oid, :tid,
- COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
- :md5sum, :data)
- """
- if data is not None:
- data = cx_Oracle.Binary(data)
- cursor.execute(stmt, oid=oid, tid=tid, md5sum=md5sum, data=data)
-
- def start_commit(self, cursor):
- """Prepare to commit."""
- # Hold commit_lock to prevent concurrent commits
- # (for as short a time as possible).
- # Lock transaction and current_object in share mode to ensure
- # conflict detection has the most current data.
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
- cursor.execute("LOCK TABLE transaction IN SHARE MODE")
- cursor.execute("LOCK TABLE current_object IN SHARE MODE")
-
- def _parse_dsinterval(self, s):
- """Convert an Oracle dsinterval (as a string) to a float."""
- mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
- if not mo:
- raise ValueError(s)
- day, hour, min, sec = [float(v) for v in mo.groups()]
- return day * 86400 + hour * 3600 + min * 60 + sec
-
- def get_tid_and_time(self, cursor):
- """Returns the most recent tid and the current database time.
-
- The database time is the number of seconds since the epoch.
- """
- cursor.execute("""
- SELECT MAX(tid), TO_CHAR(TO_DSINTERVAL(SYSTIMESTAMP - TO_TIMESTAMP_TZ(
- '1970-01-01 00:00:00 +00:00','YYYY-MM-DD HH24:MI:SS TZH:TZM')))
- FROM transaction
- """)
- tid, now = cursor.fetchone()
- return tid, self._parse_dsinterval(now)
-
- def add_transaction(self, cursor, tid, username, description, extension,
- packed=False):
- """Add a transaction."""
- stmt = """
- INSERT INTO transaction
- (tid, packed, username, description, extension)
- VALUES (:1, :2, :3, :4, :5)
- """
- max_desc_len = 2000
- if len(description) > max_desc_len:
- log.warning('Trimming description of transaction %s '
- 'to %d characters', tid, max_desc_len)
- description = description[:max_desc_len]
- cursor.execute(stmt, (
- tid, packed and 'Y' or 'N', cx_Oracle.Binary(username),
- cx_Oracle.Binary(description), cx_Oracle.Binary(extension)))
-
- def detect_conflict(self, cursor):
- """Find one conflict in the data about to be committed.
-
- If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
- attempted_data). If there is no conflict, returns None.
- """
- stmt = """
- SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
- temp_store.state
- FROM temp_store
- JOIN current_object ON (temp_store.zoid = current_object.zoid)
- WHERE temp_store.prev_tid != current_object.tid
- """
- return self.execute_lob_stmt(cursor, stmt)
-
- def move_from_temp(self, cursor, tid):
- """Move the temporarily stored objects to permanent storage.
-
- Returns the list of oids stored.
- """
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT zoid, :tid, prev_tid, md5, state
- FROM temp_store
- """
- cursor.execute(stmt, tid=tid)
-
- stmt = """
- SELECT zoid FROM temp_store
- """
- cursor.execute(stmt)
- return [oid for (oid,) in cursor]
-
- def update_current(self, cursor, tid):
- """Update the current object pointers.
-
- tid is the integer tid of the transaction being committed.
- """
- # Insert objects created in this transaction into current_object.
- stmt = """
- INSERT INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
- WHERE tid = :1
- AND prev_tid = 0
- """
- cursor.execute(stmt, (tid,))
-
- # Change existing objects.
- stmt = """
- UPDATE current_object SET tid = :1
- WHERE zoid IN (
- SELECT zoid FROM object_state
- WHERE tid = :1
- AND prev_tid != 0
- )
- """
- cursor.execute(stmt, (tid,))
-
- def set_min_oid(self, cursor, oid):
- """Ensure the next OID is at least the given OID."""
- next_oid = self.new_oid(cursor)
- if next_oid < oid:
- # Oracle provides no way modify the sequence value
- # except through alter sequence or drop/create sequence,
- # but either statement kills the current transaction.
- # Therefore, open a temporary connection to make the
- # alteration.
- conn2, cursor2 = self.open()
- try:
- # Change the sequence by altering the increment.
- # (this is safer than dropping and re-creating the sequence)
- diff = oid - next_oid
- cursor2.execute(
- "ALTER SEQUENCE zoid_seq INCREMENT BY %d" % diff)
- cursor2.execute("SELECT zoid_seq.nextval FROM DUAL")
- cursor2.execute("ALTER SEQUENCE zoid_seq INCREMENT BY 1")
- conn2.commit()
- finally:
- self.close(conn2, cursor2)
-
- def commit_phase1(self, cursor, tid):
- """Begin a commit. Returns the transaction name.
-
- This method should guarantee that commit_phase2() will succeed,
- meaning that if commit_phase2() would raise any error, the error
- should be raised in commit_phase1() instead.
- """
- if self._twophase:
- cursor.connection.prepare()
- return '-'
-
- def commit_phase2(self, cursor, txn):
- """Final transaction commit."""
- cursor.connection.commit()
-
- def abort(self, cursor, txn=None):
- """Abort the commit. If txn is not None, phase 1 is also aborted."""
- cursor.connection.rollback()
-
-
- def new_oid(self, cursor):
- """Return a new, unused OID."""
- stmt = "SELECT zoid_seq.nextval FROM DUAL"
- cursor.execute(stmt)
- return cursor.fetchone()[0]
-
-
- def hold_pack_lock(self, cursor):
- """Try to acquire the pack lock.
-
- Raise an exception if packing or undo is already in progress.
- """
- stmt = """
- LOCK TABLE pack_lock IN EXCLUSIVE MODE NOWAIT
- """
- try:
- cursor.execute(stmt)
- except cx_Oracle.DatabaseError:
- raise StorageError('A pack or undo operation is in progress')
-
- def release_pack_lock(self, cursor):
- """Release the pack lock."""
- # No action needed
- pass
-
- _poll_query = "SELECT MAX(tid) FROM transaction"
-
-
-class TrackingMap:
- """Provides values for keys while tracking which keys are accessed."""
-
- def __init__(self, source):
- self.source = source
- self.used = set()
-
- def __getitem__(self, key):
- self.used.add(key)
- return self.source[key]
+ except cx_Oracle.OperationalError, e:
+ log.warning("Unable to connect: %s", e)
+ raise
Added: relstorage/trunk/relstorage/adapters/packundo.py
===================================================================
--- relstorage/trunk/relstorage/adapters/packundo.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/packundo.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,1053 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Pack/Undo implementations.
+"""
+
+from relstorage.adapters.interfaces import IPackUndo
+from ZODB.POSException import UndoError
+from zope.interface import implements
+import logging
+import time
+
+log = logging.getLogger(__name__)
+
+
+class PackUndo(object):
+ """Abstract base class for pack/undo"""
+
+ verify_sane_database = False
+
+ def __init__(self, connmanager, runner, locker):
+ self.connmanager = connmanager
+ self.runner = runner
+ self.locker = locker
+
+ def fill_object_refs(self, conn, cursor, get_references):
+ """Update the object_refs table by analyzing new transactions."""
+ if self.keep_history:
+ stmt = """
+ SELECT transaction.tid
+ FROM transaction
+ LEFT JOIN object_refs_added
+ ON (transaction.tid = object_refs_added.tid)
+ WHERE object_refs_added.tid IS NULL
+ ORDER BY transaction.tid
+ """
+ else:
+ stmt = """
+ SELECT transaction.tid
+ FROM (SELECT DISTINCT tid FROM object_state) AS transaction
+ LEFT JOIN object_refs_added
+ ON (transaction.tid = object_refs_added.tid)
+ WHERE object_refs_added.tid IS NULL
+ ORDER BY transaction.tid
+ """
+
+ self.runner.run_script_stmt(cursor, stmt)
+ tids = [tid for (tid,) in cursor]
+ if tids:
+ added = 0
+ log.info("discovering references from objects in %d "
+ "transaction(s)" % len(tids))
+ for tid in tids:
+ added += self._add_refs_for_tid(cursor, tid, get_references)
+ if added >= 10000:
+ # save the work done so far
+ conn.commit()
+ added = 0
+ if added:
+ conn.commit()
+
+ def _add_refs_for_tid(self, cursor, tid, get_references):
+ """Fill object_refs with all states for a transaction.
+
+ Returns the number of references added.
+ """
+ log.debug("pre_pack: transaction %d: computing references ", tid)
+ from_count = 0
+
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = %(tid)s
+ """
+ self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+
+ add_rows = [] # [(from_oid, tid, to_oid)]
+ for from_oid, state in cursor:
+ if hasattr(state, 'read'):
+ # Oracle
+ state = state.read()
+ if state:
+ from_count += 1
+ try:
+ to_oids = get_references(str(state))
+ except:
+ log.error("pre_pack: can't unpickle "
+ "object %d in transaction %d; state length = %d" % (
+ from_oid, tid, len(state)))
+ raise
+ if self.keep_history:
+ for to_oid in to_oids:
+ add_rows.append((from_oid, tid, to_oid))
+ else:
+ for to_oid in to_oids:
+ add_rows.append((from_oid, to_oid))
+
+ if self.keep_history:
+ stmt = """
+ INSERT INTO object_ref (zoid, tid, to_zoid)
+ VALUES (%s, %s, %s)
+ """
+ self.runner.run_many(cursor, stmt, add_rows)
+
+ else:
+ stmt = """
+ DELETE FROM object_ref
+ WHERE zoid in (
+ SELECT zoid
+ FROM object_state
+ WHERE tid = %(tid)s
+ )
+ """
+ self.runner.run_script(cursor, stmt, {'tid': tid})
+
+ stmt = """
+ INSERT INTO object_ref (zoid, to_zoid)
+ VALUES (%s, %s)
+ """
+ self.runner.run_many(cursor, stmt, add_rows)
+
+ # The references have been computed for this transaction.
+ stmt = """
+ INSERT INTO object_refs_added (tid)
+ VALUES (%(tid)s)
+ """
+ self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+
+ to_count = len(add_rows)
+ log.debug("pre_pack: transaction %d: has %d reference(s) "
+ "from %d object(s)", tid, to_count, from_count)
+ return to_count
+
+
+ def _visit_all_references(self, cursor):
+ """Visit all references in pack_object and set the keep flags.
+ """
+ # Each of the objects to be kept might refer to other objects.
+ # Mark the referenced objects to be kept as well. Do this
+ # repeatedly until all references have been satisfied.
+ pass_num = 1
+ while True:
+ log.info("pre_pack: following references, pass %d", pass_num)
+
+ # Make a list of all parent objects that still need to be
+ # visited. Then set pack_object.visited for all pack_object
+ # rows with keep = true.
+ stmt = """
+ %(TRUNCATE)s temp_pack_visit;
+
+ INSERT INTO temp_pack_visit (zoid, keep_tid)
+ SELECT zoid, keep_tid
+ FROM pack_object
+ WHERE keep = %(TRUE)s
+ AND visited = %(FALSE)s;
+
+ UPDATE pack_object SET visited = %(TRUE)s
+ WHERE keep = %(TRUE)s
+ AND visited = %(FALSE)s
+ """
+ self.runner.run_script(cursor, stmt)
+ visit_count = cursor.rowcount
+
+ if self.verify_sane_database:
+ # Verify the update actually worked.
+ # MySQL 5.1.23 fails this test; 5.1.24 passes.
+ stmt = """
+ SELECT 1
+ FROM pack_object
+ WHERE keep = %(TRUE)s AND visited = %(FALSE)s
+ """
+ self.runner.run_script_stmt(cursor, stmt)
+ if list(cursor):
+ raise AssertionError(
+ "database failed to update pack_object")
+
+ log.debug("pre_pack: checking references from %d object(s)",
+ visit_count)
+
+ # Visit the children of all parent objects that were
+ # just visited.
+ stmt = self._script_pre_pack_follow_child_refs
+ self.runner.run_script(cursor, stmt)
+ found_count = cursor.rowcount
+
+ log.debug("pre_pack: found %d more referenced object(s) in "
+ "pass %d", found_count, pass_num)
+ if not found_count:
+ # No new references detected.
+ break
+ else:
+ pass_num += 1
+
+ def _pause_pack(self, sleep, options, start):
+ """Pause packing to allow concurrent commits."""
+ if sleep is None:
+ sleep = time.sleep
+ elapsed = time.time() - start
+ if elapsed == 0.0:
+ # Compensate for low timer resolution by
+ # assuming that at least 10 ms elapsed.
+ elapsed = 0.01
+ duty_cycle = options.pack_duty_cycle
+ if duty_cycle > 0.0 and duty_cycle < 1.0:
+ delay = min(options.pack_max_delay,
+ elapsed * (1.0 / duty_cycle - 1.0))
+ if delay > 0:
+ log.debug('pack: sleeping %.4g second(s)', delay)
+ sleep(delay)
+
+ def open_for_pre_pack(self):
+ """Open a connection to be used for the pre-pack phase.
+ Returns (conn, cursor).
+
+ Subclasses may override this.
+ """
+ return self.connmanager.open()
+
+
+class HistoryPreservingPackUndo(PackUndo):
+ implements(IPackUndo)
+
+ keep_history = True
+
+ _script_create_temp_pack_visit = """
+ CREATE TEMPORARY TABLE temp_pack_visit (
+ zoid BIGINT NOT NULL,
+ keep_tid BIGINT
+ );
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
+ """
+
+ _script_pre_pack_follow_child_refs = """
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN temp_pack_visit USING (zoid)
+ WHERE object_ref.tid >= temp_pack_visit.keep_tid
+ )
+ """
+
+ _script_choose_pack_transaction = """
+ SELECT tid
+ FROM transaction
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND packed = FALSE
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+
+ _script_create_temp_undo = """
+ CREATE TEMPORARY TABLE temp_undo (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL
+ );
+ CREATE UNIQUE INDEX temp_undo_zoid ON temp_undo (zoid)
+ """
+
+ _script_reset_temp_undo = "DROP TABLE temp_undo"
+
+ _script_transaction_has_data = """
+ SELECT tid
+ FROM object_state
+ WHERE tid = %(tid)s
+ LIMIT 1
+ """
+
+ _script_pack_current_object = """
+ DELETE FROM current_object
+ WHERE tid = %(tid)s
+ AND zoid in (
+ SELECT pack_state.zoid
+ FROM pack_state
+ WHERE pack_state.tid = %(tid)s
+ )
+ """
+
+ _script_pack_object_state = """
+ DELETE FROM object_state
+ WHERE tid = %(tid)s
+ AND zoid in (
+ SELECT pack_state.zoid
+ FROM pack_state
+ WHERE pack_state.tid = %(tid)s
+ )
+ """
+
+ _script_pack_object_ref = """
+ DELETE FROM object_refs_added
+ WHERE tid IN (
+ SELECT tid
+ FROM transaction
+ WHERE empty = %(TRUE)s
+ );
+ DELETE FROM object_ref
+ WHERE tid IN (
+ SELECT tid
+ FROM transaction
+ WHERE empty = %(TRUE)s
+ )
+ """
+
+ def verify_undoable(self, cursor, undo_tid):
+ """Raise UndoError if it is not safe to undo the specified txn."""
+ stmt = """
+ SELECT 1 FROM transaction
+ WHERE tid = %(undo_tid)s
+ AND packed = %(FALSE)s
+ """
+ self.runner.run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+ if not cursor.fetchall():
+ raise UndoError("Transaction not found or packed")
+
+ # Rule: we can undo an object if the object's state in the
+ # transaction to undo matches the object's current state.
+ # If any object in the transaction does not fit that rule,
+ # refuse to undo.
+ stmt = """
+ SELECT prev_os.zoid, current_object.tid
+ FROM object_state prev_os
+ JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
+ JOIN current_object ON (cur_os.zoid = current_object.zoid
+ AND cur_os.tid = current_object.tid)
+ WHERE prev_os.tid = %(undo_tid)s
+ AND cur_os.md5 != prev_os.md5
+ """
+ self.runner.run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+ if cursor.fetchmany():
+ raise UndoError(
+ "Some data were modified by a later transaction")
+
+ # Rule: don't allow the creation of the root object to
+ # be undone. It's hard to get it back.
+ stmt = """
+ SELECT 1
+ FROM object_state
+ WHERE tid = %(undo_tid)s
+ AND zoid = 0
+ AND prev_tid = 0
+ """
+ self.runner.run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+ if cursor.fetchall():
+ raise UndoError("Can't undo the creation of the root object")
+
+
+ def undo(self, cursor, undo_tid, self_tid):
+ """Undo a transaction.
+
+ Parameters: "undo_tid", the integer tid of the transaction to undo,
+ and "self_tid", the integer tid of the current transaction.
+
+ Returns the states copied forward by the undo operation as a
+ list of (oid, old_tid).
+ """
+ stmt = self._script_create_temp_undo
+ if stmt:
+ self.runner.run_script(cursor, stmt)
+
+ stmt = """
+ DELETE FROM temp_undo;
+
+ -- Put into temp_undo the list of objects to be undone and
+ -- the tid of the transaction that has the undone state.
+ INSERT INTO temp_undo (zoid, prev_tid)
+ SELECT zoid, prev_tid
+ FROM object_state
+ WHERE tid = %(undo_tid)s;
+
+ -- Override previous undo operations within this transaction
+ -- by resetting the current_object pointer and deleting
+ -- copied states from object_state.
+ UPDATE current_object
+ SET tid = (
+ SELECT prev_tid
+ FROM object_state
+ WHERE zoid = current_object.zoid
+ AND tid = %(self_tid)s
+ )
+ WHERE zoid IN (SELECT zoid FROM temp_undo)
+ AND tid = %(self_tid)s;
+
+ DELETE FROM object_state
+ WHERE zoid IN (SELECT zoid FROM temp_undo)
+ AND tid = %(self_tid)s;
+
+ -- Copy old states forward.
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
+ prev.md5, prev.state
+ FROM temp_undo
+ JOIN current_object ON (temp_undo.zoid = current_object.zoid)
+ LEFT JOIN object_state prev
+ ON (prev.zoid = temp_undo.zoid
+ AND prev.tid = temp_undo.prev_tid);
+
+ -- List the copied states.
+ SELECT zoid, prev_tid FROM temp_undo
+ """
+ self.runner.run_script(cursor, stmt,
+ {'undo_tid': undo_tid, 'self_tid': self_tid})
+ res = list(cursor)
+
+ stmt = self._script_reset_temp_undo
+ if stmt:
+ self.runner.run_script(cursor, stmt)
+
+ return res
+
+ def choose_pack_transaction(self, pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+ conn, cursor = self.connmanager.open()
+ try:
+ stmt = self._script_choose_pack_transaction
+ self.runner.run_script(cursor, stmt, {'tid': pack_point})
+ rows = cursor.fetchall()
+ if not rows:
+ # Nothing needs to be packed.
+ return None
+ return rows[0][0]
+ finally:
+ self.connmanager.close(conn, cursor)
+
+
+ def pre_pack(self, pack_tid, get_references, options):
+ """Decide what to pack.
+
+ tid specifies the most recent transaction to pack.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+
+ options is an instance of relstorage.Options.
+ The options.pack_gc flag indicates whether to run garbage collection.
+ If pack_gc is false, at least one revision of every object is kept,
+ even if nothing refers to it. Packing with pack_gc disabled can be
+ much faster.
+ """
+ conn, cursor = self.open_for_pre_pack()
+ try:
+ try:
+ if options.pack_gc:
+ log.info("pre_pack: start with gc enabled")
+ self._pre_pack_with_gc(
+ conn, cursor, pack_tid, get_references)
+ else:
+ log.info("pre_pack: start without gc")
+ self._pre_pack_without_gc(
+ conn, cursor, pack_tid)
+ conn.commit()
+
+ log.info("pre_pack: enumerating states to pack")
+ stmt = "%(TRUNCATE)s pack_state"
+ self.runner.run_script_stmt(cursor, stmt)
+ to_remove = 0
+
+ if options.pack_gc:
+ # Pack objects with the keep flag set to false.
+ stmt = """
+ INSERT INTO pack_state (tid, zoid)
+ SELECT tid, zoid
+ FROM object_state
+ JOIN pack_object USING (zoid)
+ WHERE keep = %(FALSE)s
+ AND tid > 0
+ AND tid <= %(pack_tid)s
+ """
+ self.runner.run_script_stmt(
+ cursor, stmt, {'pack_tid': pack_tid})
+ to_remove += cursor.rowcount
+
+ # Pack object states with the keep flag set to true.
+ stmt = """
+ INSERT INTO pack_state (tid, zoid)
+ SELECT tid, zoid
+ FROM object_state
+ JOIN pack_object USING (zoid)
+ WHERE keep = %(TRUE)s
+ AND tid > 0
+ AND tid != keep_tid
+ AND tid <= %(pack_tid)s
+ """
+ self.runner.run_script_stmt(
+ cursor, stmt, {'pack_tid':pack_tid})
+ to_remove += cursor.rowcount
+
+ log.info("pre_pack: enumerating transactions to pack")
+ stmt = "%(TRUNCATE)s pack_state_tid"
+ self.runner.run_script_stmt(cursor, stmt)
+ stmt = """
+ INSERT INTO pack_state_tid (tid)
+ SELECT DISTINCT tid
+ FROM pack_state
+ """
+ cursor.execute(stmt)
+
+ log.info("pre_pack: will remove %d object state(s)",
+ to_remove)
+
+ except:
+ log.exception("pre_pack: failed")
+ conn.rollback()
+ raise
+ else:
+ log.info("pre_pack: finished successfully")
+ conn.commit()
+ finally:
+ self.connmanager.close(conn, cursor)
+
+
+ def _pre_pack_without_gc(self, conn, cursor, pack_tid):
+ """Determine what to pack, without garbage collection.
+
+ With garbage collection disabled, there is no need to follow
+ object references.
+ """
+ # Fill the pack_object table with OIDs, but configure them
+ # all to be kept by setting keep to true.
+ log.debug("pre_pack: populating pack_object")
+ stmt = """
+ %(TRUNCATE)s pack_object;
+
+ INSERT INTO pack_object (zoid, keep, keep_tid)
+ SELECT zoid, %(TRUE)s, MAX(tid)
+ FROM object_state
+ WHERE tid > 0 AND tid <= %(pack_tid)s
+ GROUP BY zoid
+ """
+ self.runner.run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+
+ def _pre_pack_with_gc(self, conn, cursor, pack_tid, get_references):
+ """Determine what to pack, with garbage collection.
+ """
+ stmt = self._script_create_temp_pack_visit
+ if stmt:
+ self.runner.run_script(cursor, stmt)
+
+ self.fill_object_refs(conn, cursor, get_references)
+
+ log.info("pre_pack: filling the pack_object table")
+ # Fill the pack_object table with OIDs that either will be
+ # removed (if nothing references the OID) or whose history will
+ # be cut.
+ stmt = """
+ %(TRUNCATE)s pack_object;
+
+ INSERT INTO pack_object (zoid, keep, keep_tid)
+ SELECT zoid, %(FALSE)s, MAX(tid)
+ FROM object_state
+ WHERE tid > 0 AND tid <= %(pack_tid)s
+ GROUP BY zoid;
+
+ -- If the root object is in pack_object, keep it.
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE zoid = 0;
+
+ -- Keep objects that have been revised since pack_tid.
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(pack_tid)s
+ );
+
+ -- Keep objects that are still referenced by object states in
+ -- transactions that will not be packed.
+ -- Use temp_pack_visit for temporary state; otherwise MySQL 5 chokes.
+ INSERT INTO temp_pack_visit (zoid)
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ WHERE tid > %(pack_tid)s;
+
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM temp_pack_visit
+ );
+
+ %(TRUNCATE)s temp_pack_visit;
+ """
+ self.runner.run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+ # Set the 'keep' flags in pack_object
+ self._visit_all_references(cursor)
+
+
+ def pack(self, pack_tid, options, sleep=None, packed_func=None):
+ """Pack. Requires the information provided by pre_pack."""
+
+ # Read committed mode is sufficient.
+ conn, cursor = self.connmanager.open()
+ try:
+ try:
+ stmt = """
+ SELECT transaction.tid,
+ CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END,
+ CASE WHEN pack_state_tid.tid IS NOT NULL THEN 1 ELSE 0 END
+ FROM transaction
+ LEFT JOIN pack_state_tid ON (
+ transaction.tid = pack_state_tid.tid)
+ WHERE transaction.tid > 0
+ AND transaction.tid <= %(pack_tid)s
+ AND (packed = %(FALSE)s OR pack_state_tid.tid IS NOT NULL)
+ """
+ self.runner.run_script_stmt(
+ cursor, stmt, {'pack_tid': pack_tid})
+ tid_rows = list(cursor)
+ tid_rows.sort() # oldest first
+
+ log.info("pack: will pack %d transaction(s)", len(tid_rows))
+
+ stmt = self._script_create_temp_pack_visit
+ if stmt:
+ self.runner.run_script(cursor, stmt)
+
+ # Hold the commit lock while packing to prevent deadlocks.
+ # Pack in small batches of transactions in order to minimize
+ # the interruption of concurrent write operations.
+ start = time.time()
+ packed_list = []
+ self.locker.hold_commit_lock(cursor)
+ for tid, packed, has_removable in tid_rows:
+ self._pack_transaction(
+ cursor, pack_tid, tid, packed, has_removable,
+ packed_list)
+ if time.time() >= start + options.pack_batch_timeout:
+ conn.commit()
+ if packed_func is not None:
+ for oid, tid in packed_list:
+ packed_func(oid, tid)
+ del packed_list[:]
+ self.locker.release_commit_lock(cursor)
+ self._pause_pack(sleep, options, start)
+ self.locker.hold_commit_lock(cursor)
+ start = time.time()
+ if packed_func is not None:
+ for oid, tid in packed_list:
+ packed_func(oid, tid)
+ packed_list = None
+
+ self._pack_cleanup(conn, cursor)
+
+ except:
+ log.exception("pack: failed")
+ conn.rollback()
+ raise
+
+ else:
+ log.info("pack: finished successfully")
+ conn.commit()
+
+ finally:
+ self.connmanager.close(conn, cursor)
+
+
+ def _pack_transaction(self, cursor, pack_tid, tid, packed,
+ has_removable, packed_list):
+ """Pack one transaction. Requires populated pack tables."""
+ log.debug("pack: transaction %d: packing", tid)
+ removed_objects = 0
+ removed_states = 0
+
+ if has_removable:
+ stmt = self._script_pack_current_object
+ self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+ removed_objects = cursor.rowcount
+
+ stmt = self._script_pack_object_state
+ self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+ removed_states = cursor.rowcount
+
+ # Terminate prev_tid chains
+ stmt = """
+ UPDATE object_state SET prev_tid = 0
+ WHERE prev_tid = %(tid)s
+ AND tid <= %(pack_tid)s
+ """
+ self.runner.run_script_stmt(cursor, stmt,
+ {'pack_tid': pack_tid, 'tid': tid})
+
+ stmt = """
+ SELECT pack_state.zoid
+ FROM pack_state
+ WHERE pack_state.tid = %(tid)s
+ """
+ self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+ for (oid,) in cursor:
+ packed_list.append((oid, tid))
+
+ # Find out whether the transaction is empty
+ stmt = self._script_transaction_has_data
+ self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+ empty = not list(cursor)
+
+ # mark the transaction packed and possibly empty
+ if empty:
+ clause = 'empty = %(TRUE)s'
+ state = 'empty'
+ else:
+ clause = 'empty = %(FALSE)s'
+ state = 'not empty'
+ stmt = "UPDATE transaction SET packed = %(TRUE)s, " + clause
+ stmt += " WHERE tid = %(tid)s"
+ self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+
+ log.debug(
+ "pack: transaction %d (%s): removed %d object(s) and %d state(s)",
+ tid, state, removed_objects, removed_states)
+
+
+ def _pack_cleanup(self, conn, cursor):
+ """Remove unneeded table rows after packing"""
+ # commit the work done so far
+ conn.commit()
+ self.locker.release_commit_lock(cursor)
+ self.locker.hold_commit_lock(cursor)
+ log.info("pack: cleaning up")
+
+ log.debug("pack: removing unused object references")
+ stmt = self._script_pack_object_ref
+ self.runner.run_script(cursor, stmt)
+
+ log.debug("pack: removing empty packed transactions")
+ stmt = """
+ DELETE FROM transaction
+ WHERE packed = %(TRUE)s
+ AND empty = %(TRUE)s
+ """
+ self.runner.run_script_stmt(cursor, stmt)
+
+ # perform cleanup that does not require the commit lock
+ conn.commit()
+ self.locker.release_commit_lock(cursor)
+
+ log.debug("pack: clearing temporary pack state")
+ for _table in ('pack_object', 'pack_state', 'pack_state_tid'):
+ stmt = '%(TRUNCATE)s ' + _table
+ self.runner.run_script_stmt(cursor, stmt)
+
+
+class MySQLHistoryPreservingPackUndo(HistoryPreservingPackUndo):
+
+ # Work around a MySQL performance bug by avoiding an expensive subquery.
+ # See: http://mail.zope.org/pipermail/zodb-dev/2008-May/011880.html
+ # http://bugs.mysql.com/bug.php?id=28257
+ _script_create_temp_pack_visit = """
+ CREATE TEMPORARY TABLE temp_pack_visit (
+ zoid BIGINT NOT NULL,
+ keep_tid BIGINT
+ );
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid);
+ CREATE TEMPORARY TABLE temp_pack_child (
+ zoid BIGINT NOT NULL
+ );
+ CREATE UNIQUE INDEX temp_pack_child_zoid ON temp_pack_child (zoid);
+ """
+
+ # Note: UPDATE must be the last statement in the script
+ # because it returns a value.
+ _script_pre_pack_follow_child_refs = """
+ %(TRUNCATE)s temp_pack_child;
+
+ INSERT INTO temp_pack_child
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN temp_pack_visit USING (zoid)
+ WHERE object_ref.tid >= temp_pack_visit.keep_tid;
+
+ -- MySQL-specific syntax for table join in update
+ UPDATE pack_object, temp_pack_child SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND pack_object.zoid = temp_pack_child.zoid;
+ """
+
+ # MySQL optimizes deletion far better when using a join syntax.
+ _script_pack_current_object = """
+ DELETE FROM current_object
+ USING current_object
+ JOIN pack_state USING (zoid, tid)
+ WHERE current_object.tid = %(tid)s
+ """
+
+ _script_pack_object_state = """
+ DELETE FROM object_state
+ USING object_state
+ JOIN pack_state USING (zoid, tid)
+ WHERE object_state.tid = %(tid)s
+ """
+
+ _script_pack_object_ref = """
+ DELETE FROM object_refs_added
+ USING object_refs_added
+ JOIN transaction USING (tid)
+ WHERE transaction.empty = true;
+
+ DELETE FROM object_ref
+ USING object_ref
+ JOIN transaction USING (tid)
+ WHERE transaction.empty = true
+ """
+
+ def open_for_pre_pack(self):
+ """Open a connection to be used for the pre-pack phase.
+ Returns (conn, cursor).
+
+ This overrides a method.
+ """
+ conn, cursor = self.connmanager.open(transaction_mode=None)
+ try:
+ # This phase of packing works best with transactions
+ # disabled. It changes no user-facing data.
+ conn.autocommit(True)
+ return conn, cursor
+ except:
+ self.connmanager.close(conn, cursor)
+ raise
+
+
+class OracleHistoryPreservingPackUndo(HistoryPreservingPackUndo):
+
+ _script_choose_pack_transaction = """
+ SELECT MAX(tid)
+ FROM transaction
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND packed = 'N'
+ """
+
+ _script_create_temp_pack_visit = None
+ _script_create_temp_undo = None
+ _script_reset_temp_undo = "DELETE FROM temp_undo"
+
+ _script_transaction_has_data = """
+ SELECT DISTINCT tid
+ FROM object_state
+ WHERE tid = %(tid)s
+ """
+
+
+class HistoryFreePackUndo(PackUndo):
+ implements(IPackUndo)
+
+ keep_history = False
+
+ _script_create_temp_pack_visit = """
+ CREATE TEMPORARY TABLE temp_pack_visit (
+ zoid BIGINT NOT NULL,
+ keep_tid BIGINT
+ );
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
+ """
+
+ _script_pre_pack_follow_child_refs = """
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN temp_pack_visit USING (zoid)
+ )
+ """
+
+ def verify_undoable(self, cursor, undo_tid):
+ """Raise UndoError if it is not safe to undo the specified txn."""
+ raise UndoError("Undo is not supported by this storage")
+
+ def undo(self, cursor, undo_tid, self_tid):
+ """Undo a transaction.
+
+ Parameters: "undo_tid", the integer tid of the transaction to undo,
+ and "self_tid", the integer tid of the current transaction.
+
+ Returns the list of OIDs undone.
+ """
+ raise UndoError("Undo is not supported by this storage")
+
+ def choose_pack_transaction(self, pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+ return 1
+
+
+ def pre_pack(self, pack_tid, get_references, options):
+ """Decide what the garbage collector should delete.
+
+ pack_tid is ignored.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+
+ options is an instance of relstorage.Options.
+ The options.pack_gc flag indicates whether to run garbage collection.
+ If pack_gc is false, this method does nothing.
+ """
+ if not options.pack_gc:
+ log.warning("pre_pack: garbage collection is disabled on a "
+ "history-free storage, so doing nothing")
+ return
+
+ conn, cursor = self.open_for_pre_pack()
+ try:
+ try:
+ self._pre_pack_main(conn, cursor, get_references)
+ except:
+ log.exception("pre_pack: failed")
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ log.info("pre_pack: finished successfully")
+ finally:
+ self.connmanager.close(conn, cursor)
+
+
+ def _pre_pack_main(self, conn, cursor, get_references):
+ """Determine what to garbage collect.
+ """
+ stmt = self._script_create_temp_pack_visit
+ if stmt:
+ self.runner.run_script(cursor, stmt)
+
+ self.fill_object_refs(conn, cursor, get_references)
+
+ log.info("pre_pack: filling the pack_object table")
+ # Fill the pack_object table with all known OIDs.
+ stmt = """
+ %(TRUNCATE)s pack_object;
+
+ INSERT INTO pack_object (zoid, keep_tid)
+ SELECT zoid, tid
+ FROM object_state;
+
+ -- Keep the root object
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE zoid = 0;
+ """
+ self.runner.run_script(cursor, stmt)
+
+ # Set the 'keep' flags in pack_object
+ self._visit_all_references(cursor)
+
+
+ def pack(self, pack_tid, options, sleep=None, packed_func=None):
+ """Run garbage collection.
+
+ Requires the information provided by _pre_gc.
+ """
+
+ # Read committed mode is sufficient.
+ conn, cursor = self.connmanager.open()
+ try:
+ try:
+ stmt = """
+ SELECT zoid, keep_tid
+ FROM pack_object
+ WHERE keep = %(FALSE)s
+ """
+ self.runner.run_script_stmt(cursor, stmt)
+ to_remove = list(cursor)
+
+ log.info("pack: will remove %d object(s)", len(to_remove))
+
+ # Hold the commit lock while packing to prevent deadlocks.
+ # Pack in small batches of transactions in order to minimize
+ # the interruption of concurrent write operations.
+ start = time.time()
+ packed_list = []
+ self.locker.hold_commit_lock(cursor)
+
+ for item in to_remove:
+ oid, tid = item
+ stmt = """
+ DELETE FROM object_state
+ WHERE zoid = %(oid)s
+ AND tid = %(tid)s
+ """
+ self.runner.run_script_stmt(
+ cursor, stmt, {'oid': oid, 'tid': tid})
+ packed_list.append(item)
+
+ if time.time() >= start + options.pack_batch_timeout:
+ conn.commit()
+ if packed_func is not None:
+ for oid, tid in packed_list:
+ packed_func(oid, tid)
+ del packed_list[:]
+ self.locker.release_commit_lock(cursor)
+ self._pause_pack(sleep, options, start)
+ self.locker.hold_commit_lock(cursor)
+ start = time.time()
+
+ if packed_func is not None:
+ for oid, tid in packed_list:
+ packed_func(oid, tid)
+ packed_list = None
+
+ self._pack_cleanup(conn, cursor)
+
+ except:
+ log.exception("pack: failed")
+ conn.rollback()
+ raise
+
+ else:
+ log.info("pack: finished successfully")
+ conn.commit()
+
+ finally:
+ self.connmanager.close(conn, cursor)
+
+
+ def _pack_cleaup(self, conn, cursor):
+ # commit the work done so far
+ conn.commit()
+ self.locker.release_commit_lock(cursor)
+ self.locker.hold_commit_lock(cursor)
+ log.info("pack: cleaning up")
+
+ stmt = """
+ DELETE FROM object_refs_added
+ WHERE tid NOT IN (
+ SELECT DISTINCT tid
+ FROM object_state
+ );
+
+ DELETE FROM object_ref
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %(FALSE)s
+ );
+
+ %(TRUNCATE)s pack_object
+ """
+ self.runner.run_script(cursor, stmt)
Added: relstorage/trunk/relstorage/adapters/poller.py
===================================================================
--- relstorage/trunk/relstorage/adapters/poller.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/poller.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,83 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from relstorage.adapters.interfaces import IPoller
+from zope.interface import implements
+
+class Poller:
+ """Database change notification poller"""
+ implements(IPoller)
+
+ def __init__(self, poll_query, keep_history, runner):
+ self.poll_query = poll_query
+ self.keep_history = keep_history
+ self.runner = runner
+
+ def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid).
+ """
+ # find out the tid of the most recent transaction.
+ cursor.execute(self.poll_query)
+ new_polled_tid = cursor.fetchone()[0]
+
+ if prev_polled_tid is None:
+ # This is the first time the connection has polled.
+ return None, new_polled_tid
+
+ if new_polled_tid == prev_polled_tid:
+ # No transactions have been committed since prev_polled_tid.
+ return (), new_polled_tid
+
+ if self.keep_history:
+ stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
+ else:
+ stmt = "SELECT 1 FROM object_state WHERE tid <= %(tid)s LIMIT 1"
+ cursor.execute(intern(stmt % self.runner.script_vars),
+ {'tid': prev_polled_tid})
+ rows = cursor.fetchall()
+ if not rows:
+ # Transaction not found; perhaps it has been packed.
+ # The connection cache needs to be cleared.
+ return None, new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ if ignore_tid is None:
+ stmt = """
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(tid)s
+ """
+ cursor.execute(intern(stmt % self.runner.script_vars),
+ {'tid': prev_polled_tid})
+ else:
+ stmt = """
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(tid)s
+ AND tid != %(self_tid)s
+ """
+ cursor.execute(intern(stmt % self.runner.script_vars),
+ {'tid': prev_polled_tid, 'self_tid': ignore_tid})
+ oids = [oid for (oid,) in cursor]
+
+ return oids, new_polled_tid
+
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -13,178 +13,133 @@
##############################################################################
"""PostgreSQL adapter for RelStorage."""
-from base64 import decodestring, encodestring
import logging
-import psycopg2, psycopg2.extensions
-import re
-from ZODB.POSException import StorageError
+import psycopg2
+import psycopg2.extensions
-from relstorage.adapters.historypreserving import HistoryPreservingAdapter
+from relstorage.adapters.connmanager import AbstractConnectionManager
+from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
+from relstorage.adapters.loadstore import HistoryPreservingPostgreSQLLoadStore
+from relstorage.adapters.locker import PostgreSQLLocker
+from relstorage.adapters.packundo import HistoryPreservingPackUndo
+from relstorage.adapters.poller import Poller
+from relstorage.adapters.schema import HistoryPreservingPostgreSQLSchema
+from relstorage.adapters.scriptrunner import ScriptRunner
+from relstorage.adapters.stats import PostgreSQLStats
+from relstorage.adapters.txncontrol import PostgreSQLTransactionControl
-log = logging.getLogger("relstorage.adapters.postgresql")
+log = logging.getLogger(__name__)
# disconnected_exceptions contains the exception types that might be
# raised when the connection to the database has been broken.
disconnected_exceptions = (psycopg2.OperationalError, psycopg2.InterfaceError)
-class PostgreSQLAdapter(HistoryPreservingAdapter):
+class PostgreSQLAdapter(object):
"""PostgreSQL adapter for RelStorage."""
+ keep_history = True
+
def __init__(self, dsn=''):
- self._dsn = dsn
+ self.connmanager = Psycopg2ConnectionManager(dsn)
+ self.runner = ScriptRunner()
+ self.locker = PostgreSQLLocker((psycopg2.DatabaseError,))
+ self.schema = HistoryPreservingPostgreSQLSchema(
+ locker=self.locker,
+ connmanager=self.connmanager,
+ )
+ self.loadstore = HistoryPreservingPostgreSQLLoadStore(
+ connmanager=self.connmanager,
+ disconnected_exceptions=disconnected_exceptions,
+ )
+ self.txncontrol = PostgreSQLTransactionControl()
+ self.poller = Poller(
+ poll_query="EXECUTE get_latest_tid",
+ keep_history=True,
+ runner=self.runner,
+ )
+ self.packundo = HistoryPreservingPackUndo(
+ connmanager=self.connmanager,
+ runner=self.runner,
+ locker=self.locker,
+ )
+ self.dbiter = HistoryPreservingDatabaseIterator(
+ runner=self.runner,
+ )
+ self.stats = PostgreSQLStats(
+ connmanager=self.connmanager,
+ )
- def create_schema(self, cursor):
- """Create the database tables."""
- stmt = """
- CREATE TABLE commit_lock ();
+ self.open = self.connmanager.open
+ self.close = self.connmanager.close
- -- The list of all transactions in the database
- CREATE TABLE transaction (
- tid BIGINT NOT NULL PRIMARY KEY,
- packed BOOLEAN NOT NULL DEFAULT FALSE,
- empty BOOLEAN NOT NULL DEFAULT FALSE,
- username BYTEA NOT NULL,
- description BYTEA NOT NULL,
- extension BYTEA
- );
+ self.hold_commit_lock = self.locker.hold_commit_lock
+ self.release_commit_lock = self.locker.release_commit_lock
+ self.hold_pack_lock = self.locker.hold_pack_lock
+ self.release_pack_lock = self.locker.release_pack_lock
- -- Create a special transaction to represent object creation. This
- -- row is often referenced by object_state.prev_tid, but never by
- -- object_state.tid.
- INSERT INTO transaction (tid, username, description)
- VALUES (0, 'system', 'special transaction for object creation');
+ self.create_schema = self.schema.create
+ self.prepare_schema = self.schema.prepare
+ self.zap_all = self.schema.zap_all
+ self.drop_all = self.schema.drop_all
- CREATE SEQUENCE zoid_seq;
+ self.open_for_load = self.loadstore.open_for_load
+ self.restart_load = self.loadstore.restart_load
+ self.get_current_tid = self.loadstore.get_current_tid
+ self.load_current = self.loadstore.load_current
+ self.load_revision = self.loadstore.load_revision
+ self.exists = self.loadstore.exists
+ self.load_before = self.loadstore.load_before
+ self.get_object_tid_after = self.loadstore.get_object_tid_after
- -- All object states in all transactions. Note that md5 and state
- -- can be null to represent object uncreation.
- CREATE TABLE object_state (
- zoid BIGINT NOT NULL,
- tid BIGINT NOT NULL REFERENCES transaction
- CHECK (tid > 0),
- PRIMARY KEY (zoid, tid),
- prev_tid BIGINT NOT NULL REFERENCES transaction,
- md5 CHAR(32),
- state BYTEA
- );
- CREATE INDEX object_state_tid ON object_state (tid);
- CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+ self.open_for_store = self.loadstore.open_for_store
+ self.restart_store = self.loadstore.restart_store
+ self.store_temp = self.loadstore.store_temp
+ self.replace_temp = self.loadstore.replace_temp
+ self.restore = self.loadstore.restore
+ self.detect_conflict = self.loadstore.detect_conflict
+ self.move_from_temp = self.loadstore.move_from_temp
+ self.update_current = self.loadstore.update_current
+ self.set_min_oid = self.loadstore.set_min_oid
+ self.new_oid = self.loadstore.new_oid
- -- Pointers to the current object state
- CREATE TABLE current_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- tid BIGINT NOT NULL,
- FOREIGN KEY (zoid, tid) REFERENCES object_state
- );
- CREATE INDEX current_object_tid ON current_object (tid);
+ self.get_tid_and_time = self.txncontrol.get_tid_and_time
+ self.add_transaction = self.txncontrol.add_transaction
+ self.commit_phase1 = self.txncontrol.commit_phase1
+ self.commit_phase2 = self.txncontrol.commit_phase2
+ self.abort = self.txncontrol.abort
- -- A list of referenced OIDs from each object_state.
- -- This table is populated as needed during packing.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because rows in object_state
- -- are never modified once committed, and rows are removed
- -- from object_state only by packing.
- CREATE TABLE object_ref (
- zoid BIGINT NOT NULL,
- tid BIGINT NOT NULL,
- to_zoid BIGINT NOT NULL,
- PRIMARY KEY (tid, zoid, to_zoid)
- );
+ self.poll_invalidations = self.poller.poll_invalidations
- -- The object_refs_added table tracks whether object_refs has
- -- been populated for all states in a given transaction.
- -- An entry is added only when the work is finished.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because object states
- -- are never added to a transaction once committed, and
- -- rows are removed from the transaction table only by
- -- packing.
- CREATE TABLE object_refs_added (
- tid BIGINT NOT NULL PRIMARY KEY
- );
+ self.fill_object_refs = self.packundo.fill_object_refs
+ self.open_for_pre_pack = self.packundo.open_for_pre_pack
+ self.choose_pack_transaction = self.packundo.choose_pack_transaction
+ self.pre_pack = self.packundo.pre_pack
+ self.pack = self.packundo.pack
+ self.verify_undoable = self.packundo.verify_undoable
+ self.undo = self.packundo.undo
- -- Temporary state during packing:
- -- The list of objects to pack. If keep is false,
- -- the object and all its revisions will be removed.
- -- If keep is true, instead of removing the object,
- -- the pack operation will cut the object's history.
- -- The keep_tid field specifies the oldest revision
- -- of the object to keep.
- -- The visited flag is set when pre_pack is visiting an object's
- -- references, and remains set.
- CREATE TABLE pack_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- keep BOOLEAN NOT NULL,
- keep_tid BIGINT NOT NULL,
- visited BOOLEAN NOT NULL DEFAULT FALSE
- );
- CREATE INDEX pack_object_keep_false ON pack_object (zoid)
- WHERE keep = false;
- CREATE INDEX pack_object_keep_true ON pack_object (visited)
- WHERE keep = true;
+ self.iter_objects = self.dbiter.iter_objects
+ self.iter_transactions = self.dbiter.iter_transactions
+ self.iter_transactions_range = self.dbiter.iter_transactions_range
+ self.iter_object_history = self.dbiter.iter_object_history
- -- Temporary state during packing: the list of object states to pack.
- CREATE TABLE pack_state (
- tid BIGINT NOT NULL,
- zoid BIGINT NOT NULL,
- PRIMARY KEY (tid, zoid)
- );
+ self.get_object_count = self.stats.get_object_count
+ self.get_db_size = self.stats.get_db_size
- -- Temporary state during packing: the list of transactions that
- -- have at least one object state to pack.
- CREATE TABLE pack_state_tid (
- tid BIGINT NOT NULL PRIMARY KEY
- );
- """
- cursor.execute(stmt)
- if not self._pg_has_advisory_locks(cursor):
- cursor.execute("CREATE TABLE pack_lock ()")
+class Psycopg2ConnectionManager(AbstractConnectionManager):
+ isolation_read_committed = (
+ psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+ isolation_serializable = (
+ psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
- def prepare_schema(self):
- """Create the database schema if it does not already exist."""
- def callback(conn, cursor):
- cursor.execute("""
- SELECT tablename
- FROM pg_tables
- WHERE tablename = 'object_state'
- """)
- if not cursor.rowcount:
- self.create_schema(cursor)
- self._open_and_call(callback)
+ close_exceptions = disconnected_exceptions
- def zap_all(self):
- """Clear all data out of the database."""
- def callback(conn, cursor):
- cursor.execute("""
- DELETE FROM object_refs_added;
- DELETE FROM object_ref;
- DELETE FROM current_object;
- DELETE FROM object_state;
- DELETE FROM transaction;
- -- Create a special transaction to represent object creation.
- INSERT INTO transaction (tid, username, description) VALUES
- (0, 'system', 'special transaction for object creation');
- ALTER SEQUENCE zoid_seq RESTART WITH 1;
- """)
- self._open_and_call(callback)
+ def __init__(self, dsn):
+ self._dsn = dsn
- def drop_all(self):
- """Drop all tables and sequences."""
- def callback(conn, cursor):
- cursor.execute("SELECT tablename FROM pg_tables")
- existent = set([name for (name,) in cursor])
- for tablename in ('pack_state_tid', 'pack_state',
- 'pack_object', 'object_refs_added', 'object_ref',
- 'current_object', 'object_state', 'transaction',
- 'commit_lock', 'pack_lock'):
- if tablename in existent:
- cursor.execute("DROP TABLE %s" % tablename)
- cursor.execute("DROP SEQUENCE zoid_seq")
- self._open_and_call(callback)
-
def open(self,
isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
"""Open a database connection and return (conn, cursor)."""
@@ -198,397 +153,3 @@
raise
return conn, cursor
- def close(self, conn, cursor):
- """Close a connection and cursor, ignoring certain errors.
- """
- for obj in (cursor, conn):
- if obj is not None:
- try:
- obj.close()
- except disconnected_exceptions:
- pass
-
- def _pg_version(self, cursor):
- """Return the (major, minor) version of PostgreSQL"""
- cursor.execute("SELECT version()")
- v = cursor.fetchone()[0]
- m = re.search(r"([0-9]+)[.]([0-9]+)", v)
- if m is None:
- raise AssertionError("Unable to detect PostgreSQL version: " + v)
- else:
- return int(m.group(1)), int(m.group(2))
-
- def _pg_has_advisory_locks(self, cursor):
- """Return true if this version of PostgreSQL supports advisory locks"""
- return self._pg_version(cursor) >= (8, 2)
-
- def open_for_load(self):
- """Open and initialize a connection for loading objects.
-
- Returns (conn, cursor).
- """
- conn, cursor = self.open(
- psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
- stmt = """
- PREPARE get_latest_tid AS
- SELECT tid
- FROM transaction
- ORDER BY tid DESC
- LIMIT 1
- """
- cursor.execute(stmt)
- return conn, cursor
-
- def restart_load(self, cursor):
- """Reinitialize a connection for loading objects."""
- try:
- cursor.connection.rollback()
- except disconnected_exceptions, e:
- raise StorageError(e)
-
- def get_object_count(self):
- """Returns the number of objects in the database"""
- # do later
- return 0
-
- def get_db_size(self):
- """Returns the approximate size of the database in bytes"""
- def callback(conn, cursor):
- cursor.execute("SELECT pg_database_size(current_database())")
- return cursor.fetchone()[0]
- return self._open_and_call(callback)
-
- def get_current_tid(self, cursor, oid):
- """Returns the current integer tid for an object.
-
- oid is an integer. Returns None if object does not exist.
- """
- cursor.execute("""
- SELECT tid
- FROM current_object
- WHERE zoid = %s
- """, (oid,))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
- return None
-
- def load_current(self, cursor, oid):
- """Returns the current pickle and integer tid for an object.
-
- oid is an integer. Returns (None, None) if object does not exist.
- """
- cursor.execute("""
- SELECT encode(state, 'base64'), tid
- FROM current_object
- JOIN object_state USING(zoid, tid)
- WHERE zoid = %s
- """, (oid,))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- state64, tid = cursor.fetchone()
- if state64 is not None:
- state = decodestring(state64)
- else:
- # This object's creation has been undone
- state = None
- return state, tid
- else:
- return None, None
-
- def load_revision(self, cursor, oid, tid):
- """Returns the pickle for an object on a particular transaction.
-
- Returns None if no such state exists.
- """
- cursor.execute("""
- SELECT encode(state, 'base64')
- FROM object_state
- WHERE zoid = %s
- AND tid = %s
- """, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- (state64,) = cursor.fetchone()
- if state64 is not None:
- return decodestring(state64)
- return None
-
- def exists(self, cursor, oid):
- """Returns a true value if the given object exists."""
- cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
- return cursor.rowcount
-
- def load_before(self, cursor, oid, tid):
- """Returns the pickle and tid of an object before transaction tid.
-
- Returns (None, None) if no earlier state exists.
- """
- cursor.execute("""
- SELECT encode(state, 'base64'), tid
- FROM object_state
- WHERE zoid = %s
- AND tid < %s
- ORDER BY tid DESC
- LIMIT 1
- """, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- state64, tid = cursor.fetchone()
- if state64 is not None:
- state = decodestring(state64)
- else:
- # The object's creation has been undone
- state = None
- return state, tid
- else:
- return None, None
-
- def get_object_tid_after(self, cursor, oid, tid):
- """Returns the tid of the next change after an object revision.
-
- Returns None if no later state exists.
- """
- stmt = """
- SELECT tid
- FROM object_state
- WHERE zoid = %s
- AND tid > %s
- ORDER BY tid
- LIMIT 1
- """
- cursor.execute(stmt, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
- else:
- return None
-
- def _make_temp_table(self, cursor):
- """Create the temporary table for storing objects"""
- stmt = """
- CREATE TEMPORARY TABLE temp_store (
- zoid BIGINT NOT NULL,
- prev_tid BIGINT NOT NULL,
- md5 CHAR(32),
- state BYTEA
- ) ON COMMIT DROP;
- CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
- """
- cursor.execute(stmt)
-
- def open_for_store(self):
- """Open and initialize a connection for storing objects.
-
- Returns (conn, cursor).
- """
- conn, cursor = self.open()
- try:
- self._make_temp_table(cursor)
- return conn, cursor
- except:
- self.close(conn, cursor)
- raise
-
- def restart_store(self, cursor):
- """Reuse a store connection."""
- try:
- cursor.connection.rollback()
- self._make_temp_table(cursor)
- except disconnected_exceptions, e:
- raise StorageError(e)
-
- def store_temp(self, cursor, oid, prev_tid, data):
- """Store an object in the temporary table."""
- md5sum = self.md5sum(data)
- stmt = """
- DELETE FROM temp_store WHERE zoid = %s;
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (%s, %s, %s, decode(%s, 'base64'))
- """
- cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
-
- def replace_temp(self, cursor, oid, prev_tid, data):
- """Replace an object in the temporary table."""
- md5sum = self.md5sum(data)
- stmt = """
- UPDATE temp_store SET
- prev_tid = %s,
- md5 = %s,
- state = decode(%s, 'base64')
- WHERE zoid = %s
- """
- cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
-
- def restore(self, cursor, oid, tid, data):
- """Store an object directly, without conflict detection.
-
- Used for copying transactions into this database.
- """
- md5sum = self.md5sum(data)
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (%s, %s,
- COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
- %s, decode(%s, 'base64'))
- """
- if data is not None:
- data = encodestring(data)
- cursor.execute(stmt, (oid, tid, oid, md5sum, data))
-
- def start_commit(self, cursor):
- """Prepare to commit."""
- # Hold commit_lock to prevent concurrent commits
- # (for as short a time as possible).
- # Lock transaction and current_object in share mode to ensure
- # conflict detection has the most current data.
- cursor.execute("""
- LOCK TABLE commit_lock IN EXCLUSIVE MODE;
- LOCK TABLE transaction IN SHARE MODE;
- LOCK TABLE current_object IN SHARE MODE
- """)
-
- def get_tid_and_time(self, cursor):
- """Returns the most recent tid and the current database time.
-
- The database time is the number of seconds since the epoch.
- """
- cursor.execute("""
- SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
- FROM transaction
- ORDER BY tid DESC
- LIMIT 1
- """)
- assert cursor.rowcount == 1
- return cursor.fetchone()
-
- def add_transaction(self, cursor, tid, username, description, extension,
- packed=False):
- """Add a transaction."""
- stmt = """
- INSERT INTO transaction
- (tid, packed, username, description, extension)
- VALUES (%s, %s,
- decode(%s, 'base64'), decode(%s, 'base64'), decode(%s, 'base64'))
- """
- cursor.execute(stmt, (tid, packed,
- encodestring(username), encodestring(description),
- encodestring(extension)))
-
- def detect_conflict(self, cursor):
- """Find one conflict in the data about to be committed.
-
- If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
- attempted_data). If there is no conflict, returns None.
- """
- stmt = """
- SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
- encode(temp_store.state, 'base64')
- FROM temp_store
- JOIN current_object ON (temp_store.zoid = current_object.zoid)
- WHERE temp_store.prev_tid != current_object.tid
- LIMIT 1
- """
- cursor.execute(stmt)
- if cursor.rowcount:
- oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
- return oid, prev_tid, attempted_prev_tid, decodestring(data)
- return None
-
- def move_from_temp(self, cursor, tid):
- """Moved the temporarily stored objects to permanent storage.
-
- Returns the list of oids stored.
- """
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT zoid, %s, prev_tid, md5, state
- FROM temp_store
- """
- cursor.execute(stmt, (tid,))
-
- stmt = """
- SELECT zoid FROM temp_store
- """
- cursor.execute(stmt)
- return [oid for (oid,) in cursor]
-
- def update_current(self, cursor, tid):
- """Update the current object pointers.
-
- tid is the integer tid of the transaction being committed.
- """
- cursor.execute("""
- -- Insert objects created in this transaction into current_object.
- INSERT INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
- WHERE tid = %(tid)s
- AND prev_tid = 0;
-
- -- Change existing objects. To avoid deadlocks,
- -- update in OID order.
- UPDATE current_object SET tid = %(tid)s
- WHERE zoid IN (
- SELECT zoid FROM object_state
- WHERE tid = %(tid)s
- AND prev_tid != 0
- ORDER BY zoid
- )
- """, {'tid': tid})
-
- def set_min_oid(self, cursor, oid):
- """Ensure the next OID is at least the given OID."""
- cursor.execute("""
- SELECT CASE WHEN %s > nextval('zoid_seq')
- THEN setval('zoid_seq', %s)
- ELSE 0
- END
- """, (oid, oid))
-
- def commit_phase1(self, cursor, tid):
- """Begin a commit. Returns the transaction name.
-
- This method should guarantee that commit_phase2() will succeed,
- meaning that if commit_phase2() would raise any error, the error
- should be raised in commit_phase1() instead.
- """
- return '-'
-
- def commit_phase2(self, cursor, txn):
- """Final transaction commit."""
- cursor.connection.commit()
-
- def abort(self, cursor, txn=None):
- """Abort the commit. If txn is not None, phase 1 is also aborted."""
- cursor.connection.rollback()
-
- def new_oid(self, cursor):
- """Return a new, unused OID."""
- stmt = "SELECT NEXTVAL('zoid_seq')"
- cursor.execute(stmt)
- return cursor.fetchone()[0]
-
- def hold_pack_lock(self, cursor):
- """Try to acquire the pack lock.
-
- Raise an exception if packing or undo is already in progress.
- """
- if self._pg_has_advisory_locks(cursor):
- cursor.execute("SELECT pg_try_advisory_lock(1)")
- locked = cursor.fetchone()[0]
- if not locked:
- raise StorageError('A pack or undo operation is in progress')
- else:
- # b/w compat
- try:
- cursor.execute("LOCK pack_lock IN EXCLUSIVE MODE NOWAIT")
- except psycopg2.DatabaseError:
- raise StorageError('A pack or undo operation is in progress')
-
- def release_pack_lock(self, cursor):
- """Release the pack lock."""
- if self._pg_has_advisory_locks(cursor):
- cursor.execute("SELECT pg_advisory_unlock(1)")
- # else no action needed since the lock will be released at txn commit
-
- _poll_query = "EXECUTE get_latest_tid"
Added: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/schema.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,506 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Database schema installers
+"""
+from relstorage.adapters.interfaces import ISchemaInstaller
+from zope.interface import implements
+import time
+
+class HistoryPreservingPostgreSQLSchema(object):
+ implements(ISchemaInstaller)
+
+ script = """
+ CREATE TABLE commit_lock ();
+
+ -- The list of all transactions in the database
+ CREATE TABLE transaction (
+ tid BIGINT NOT NULL PRIMARY KEY,
+ packed BOOLEAN NOT NULL DEFAULT FALSE,
+ empty BOOLEAN NOT NULL DEFAULT FALSE,
+ username BYTEA NOT NULL,
+ description BYTEA NOT NULL,
+ extension BYTEA
+ );
+
+ -- Create a special transaction to represent object creation. This
+ -- row is often referenced by object_state.prev_tid, but never by
+ -- object_state.tid.
+ INSERT INTO transaction (tid, username, description)
+ VALUES (0, 'system', 'special transaction for object creation');
+
+ CREATE SEQUENCE zoid_seq;
+
+ -- All object states in all transactions. Note that md5 and state
+ -- can be null to represent object uncreation.
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL REFERENCES transaction
+ CHECK (tid > 0),
+ PRIMARY KEY (zoid, tid),
+ prev_tid BIGINT NOT NULL REFERENCES transaction,
+ md5 CHAR(32),
+ state BYTEA
+ );
+ CREATE INDEX object_state_tid ON object_state (tid);
+ CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+
+ -- Pointers to the current object state
+ CREATE TABLE current_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL,
+ FOREIGN KEY (zoid, tid) REFERENCES object_state
+ );
+ CREATE INDEX current_object_tid ON current_object (tid);
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during packing.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because rows in object_state
+ -- are never modified once committed, and rows are removed
+ -- from object_state only by packing.
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL,
+ PRIMARY KEY (tid, zoid, to_zoid)
+ );
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because object states
+ -- are never added to a transaction once committed, and
+ -- rows are removed from the transaction table only by
+ -- packing.
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during packing:
+ -- The list of objects to pack. If keep is false,
+ -- the object and all its revisions will be removed.
+ -- If keep is true, instead of removing the object,
+ -- the pack operation will cut the object's history.
+ -- The keep_tid field specifies the oldest revision
+ -- of the object to keep.
+ -- The visited flag is set when pre_pack is visiting an object's
+ -- references, and remains set.
+ CREATE TABLE pack_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL,
+ keep_tid BIGINT NOT NULL,
+ visited BOOLEAN NOT NULL DEFAULT FALSE
+ );
+ CREATE INDEX pack_object_keep_false ON pack_object (zoid)
+ WHERE keep = false;
+ CREATE INDEX pack_object_keep_true ON pack_object (visited)
+ WHERE keep = true;
+
+ -- Temporary state during packing: the list of object states to pack.
+ CREATE TABLE pack_state (
+ tid BIGINT NOT NULL,
+ zoid BIGINT NOT NULL,
+ PRIMARY KEY (tid, zoid)
+ );
+
+ -- Temporary state during packing: the list of transactions that
+ -- have at least one object state to pack.
+ CREATE TABLE pack_state_tid (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
+ """
+
+ def __init__(self, locker, connmanager):
+ self.locker = locker
+ self.connmanager = connmanager
+
+ def create(self, cursor):
+ """Create the database tables."""
+ cursor.execute(self.script)
+ self.locker.create_pack_lock(cursor)
+
+ def prepare(self):
+ """Create the database schema if it does not already exist."""
+ def callback(conn, cursor):
+ cursor.execute("""
+ SELECT tablename
+ FROM pg_tables
+ WHERE tablename = 'object_state'
+ """)
+ if not cursor.rowcount:
+ self.create(cursor)
+ self.connmanager.open_and_call(callback)
+
+ def zap_all(self):
+ """Clear all data out of the database."""
+ def callback(conn, cursor):
+ cursor.execute("""
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ DELETE FROM transaction;
+ -- Create a special transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, 'system', 'special transaction for object creation');
+ ALTER SEQUENCE zoid_seq RESTART WITH 1;
+ """)
+ self.connmanager.open_and_call(callback)
+
+ def drop_all(self):
+ """Drop all tables and sequences."""
+ def callback(conn, cursor):
+ cursor.execute("SELECT tablename FROM pg_tables")
+ existent = set([name for (name,) in cursor])
+ for tablename in ('pack_state_tid', 'pack_state',
+ 'pack_object', 'object_refs_added', 'object_ref',
+ 'current_object', 'object_state', 'transaction',
+ 'commit_lock', 'pack_lock'):
+ if tablename in existent:
+ cursor.execute("DROP TABLE %s" % tablename)
+ cursor.execute("DROP SEQUENCE zoid_seq")
+ self.connmanager.open_and_call(callback)
+
+
+class HistoryPreservingMySQLSchema(object):
+ implements(ISchemaInstaller)
+
+ script = """
+ -- The list of all transactions in the database
+ CREATE TABLE transaction (
+ tid BIGINT NOT NULL PRIMARY KEY,
+ packed BOOLEAN NOT NULL DEFAULT FALSE,
+ empty BOOLEAN NOT NULL DEFAULT FALSE,
+ username BLOB NOT NULL,
+ description BLOB NOT NULL,
+ extension BLOB
+ ) ENGINE = InnoDB;
+
+ -- Create a special transaction to represent object creation. This
+ -- row is often referenced by object_state.prev_tid, but never by
+ -- object_state.tid.
+ INSERT INTO transaction (tid, username, description)
+ VALUES (0, 'system', 'special transaction for object creation');
+
+ -- All OIDs allocated in the database. Note that this table
+ -- is purposely non-transactional.
+ CREATE TABLE new_oid (
+ zoid BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT
+ ) ENGINE = MyISAM;
+
+ -- All object states in all transactions. Note that md5 and state
+ -- can be null to represent object uncreation.
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL REFERENCES transaction,
+ PRIMARY KEY (zoid, tid),
+ prev_tid BIGINT NOT NULL REFERENCES transaction,
+ md5 CHAR(32) CHARACTER SET ascii,
+ state LONGBLOB,
+ CHECK (tid > 0)
+ ) ENGINE = InnoDB;
+ CREATE INDEX object_state_tid ON object_state (tid);
+ CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+
+ -- Pointers to the current object state
+ CREATE TABLE current_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL,
+ FOREIGN KEY (zoid, tid) REFERENCES object_state (zoid, tid)
+ ) ENGINE = InnoDB;
+ CREATE INDEX current_object_tid ON current_object (tid);
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during packing.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because rows in object_state
+ -- are never modified once committed, and rows are removed
+ -- from object_state only by packing.
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL,
+ PRIMARY KEY (tid, zoid, to_zoid)
+ ) ENGINE = MyISAM;
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because object states
+ -- are never added to a transaction once committed, and
+ -- rows are removed from the transaction table only by
+ -- packing.
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ ) ENGINE = MyISAM;
+
+ -- Temporary state during packing:
+ -- The list of objects to pack. If keep is false,
+ -- the object and all its revisions will be removed.
+ -- If keep is true, instead of removing the object,
+ -- the pack operation will cut the object's history.
+ -- The keep_tid field specifies the oldest revision
+ -- of the object to keep.
+ -- The visited flag is set when pre_pack is visiting an object's
+ -- references, and remains set.
+ CREATE TABLE pack_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL,
+ keep_tid BIGINT NOT NULL,
+ visited BOOLEAN NOT NULL DEFAULT FALSE
+ ) ENGINE = MyISAM;
+ CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+
+ -- Temporary state during packing: the list of object states to pack.
+ CREATE TABLE pack_state (
+ tid BIGINT NOT NULL,
+ zoid BIGINT NOT NULL,
+ PRIMARY KEY (tid, zoid)
+ ) ENGINE = MyISAM;
+
+ -- Temporary state during packing: the list of transactions that
+ -- have at least one object state to pack.
+ CREATE TABLE pack_state_tid (
+ tid BIGINT NOT NULL PRIMARY KEY
+ ) ENGINE = MyISAM;
+ """
+
+ def __init__(self, connmanager, runner):
+ self.connmanager = connmanager
+ self.runner = runner
+
+ def create(self, cursor):
+ """Create the database tables."""
+ self.runner.run_script(cursor, self.script)
+
+ def prepare(self):
+ """Create the database schema if it does not already exist."""
+ def callback(conn, cursor):
+ cursor.execute("SHOW TABLES LIKE 'object_state'")
+ if not cursor.rowcount:
+ self.create(cursor)
+ self.connmanager.open_and_call(callback)
+
+ def zap_all(self):
+ """Clear all data out of the database."""
+ def callback(conn, cursor):
+ stmt = """
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ TRUNCATE new_oid;
+ DELETE FROM transaction;
+ -- Create a transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, 'system', 'special transaction for object creation');
+ """
+ self.runner.run_script(cursor, stmt)
+ self.connmanager.open_and_call(callback)
+
+ def drop_all(self):
+ """Drop all tables and sequences."""
+ def callback(conn, cursor):
+ for tablename in ('pack_state_tid', 'pack_state',
+ 'pack_object', 'object_refs_added', 'object_ref',
+ 'current_object', 'object_state', 'new_oid',
+ 'transaction'):
+ cursor.execute("DROP TABLE IF EXISTS %s" % tablename)
+ self.connmanager.open_and_call(callback)
+
+
+class HistoryPreservingOracleSchema(object):
+ implements(ISchemaInstaller)
+
+ script = """
+ CREATE TABLE commit_lock (dummy CHAR);
+
+ -- The list of all transactions in the database
+ CREATE TABLE transaction (
+ tid NUMBER(20) NOT NULL PRIMARY KEY,
+ packed CHAR DEFAULT 'N' CHECK (packed IN ('N', 'Y')),
+ empty CHAR DEFAULT 'N' CHECK (empty IN ('N', 'Y')),
+ username RAW(500),
+ description RAW(2000),
+ extension RAW(2000)
+ );
+
+ -- Create a special transaction to represent object creation. This
+ -- row is often referenced by object_state.prev_tid, but never by
+ -- object_state.tid.
+ INSERT INTO transaction (tid, username, description)
+ VALUES (0,
+ UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
+ UTL_I18N.STRING_TO_RAW(
+ 'special transaction for object creation', 'US7ASCII'));
+
+ CREATE SEQUENCE zoid_seq;
+
+ -- All object states in all transactions.
+ -- md5 and state can be null to represent object uncreation.
+ CREATE TABLE object_state (
+ zoid NUMBER(20) NOT NULL,
+ tid NUMBER(20) NOT NULL REFERENCES transaction
+ CHECK (tid > 0),
+ PRIMARY KEY (zoid, tid),
+ prev_tid NUMBER(20) NOT NULL REFERENCES transaction,
+ md5 CHAR(32),
+ state BLOB
+ );
+ CREATE INDEX object_state_tid ON object_state (tid);
+ CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+
+ -- Pointers to the current object state
+ CREATE TABLE current_object (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ tid NUMBER(20) NOT NULL,
+ FOREIGN KEY (zoid, tid) REFERENCES object_state
+ );
+ CREATE INDEX current_object_tid ON current_object (tid);
+
+ -- States that will soon be stored
+ CREATE GLOBAL TEMPORARY TABLE temp_store (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ prev_tid NUMBER(20) NOT NULL,
+ md5 CHAR(32),
+ state BLOB
+ ) ON COMMIT DELETE ROWS;
+
+ -- During packing, an exclusive lock is held on pack_lock.
+ CREATE TABLE pack_lock (dummy CHAR);
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during packing.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because rows in object_state
+ -- are never modified once committed, and rows are removed
+ -- from object_state only by packing.
+ CREATE TABLE object_ref (
+ zoid NUMBER(20) NOT NULL,
+ tid NUMBER(20) NOT NULL,
+ to_zoid NUMBER(20) NOT NULL,
+ PRIMARY KEY (tid, zoid, to_zoid)
+ );
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because object states
+ -- are never added to a transaction once committed, and
+ -- rows are removed from the transaction table only by
+ -- packing.
+ CREATE TABLE object_refs_added (
+ tid NUMBER(20) NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during packing:
+ -- The list of objects to pack. If keep is 'N',
+ -- the object and all its revisions will be removed.
+ -- If keep is 'Y', instead of removing the object,
+ -- the pack operation will cut the object's history.
+ -- The keep_tid field specifies the oldest revision
+ -- of the object to keep.
+ -- The visited flag is set when pre_pack is visiting an object's
+ -- references, and remains set.
+ CREATE TABLE pack_object (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ keep CHAR NOT NULL CHECK (keep IN ('N', 'Y')),
+ keep_tid NUMBER(20) NOT NULL,
+ visited CHAR DEFAULT 'N' NOT NULL CHECK (visited IN ('N', 'Y'))
+ );
+ CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+
+ -- Temporary state during packing: the list of object states to pack.
+ CREATE TABLE pack_state (
+ tid NUMBER(20) NOT NULL,
+ zoid NUMBER(20) NOT NULL,
+ PRIMARY KEY (tid, zoid)
+ );
+
+ -- Temporary state during packing: the list of transactions that
+ -- have at least one object state to pack.
+ CREATE TABLE pack_state_tid (
+ tid NUMBER(20) NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during packing: a list of objects
+ -- whose references need to be examined.
+ CREATE GLOBAL TEMPORARY TABLE temp_pack_visit (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ keep_tid NUMBER(20)
+ );
+
+ -- Temporary state during undo: a list of objects
+ -- to be undone and the tid of the undone state.
+ CREATE GLOBAL TEMPORARY TABLE temp_undo (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ prev_tid NUMBER(20) NOT NULL
+ );
+ """
+
+ def __init__(self, connmanager, runner):
+ self.connmanager = connmanager
+ self.runner = runner
+
+ def create(self, cursor):
+ """Create the database tables."""
+ self.runner.run_script(cursor, self.script)
+ # Let Oracle catch up with the new data definitions by sleeping.
+ # This reduces the likelihood of spurious ORA-01466 errors.
+ time.sleep(5)
+
+ def prepare(self):
+ """Create the database schema if it does not already exist."""
+ def callback(conn, cursor):
+ cursor.execute("""
+ SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
+ """)
+ if not cursor.fetchall():
+ self.create(cursor)
+ self.connmanager.open_and_call(callback)
+
+ def zap_all(self):
+ """Clear all data out of the database."""
+ def callback(conn, cursor):
+ stmt = """
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ DELETE FROM transaction;
+ -- Create a transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
+ UTL_I18N.STRING_TO_RAW(
+ 'special transaction for object creation', 'US7ASCII'));
+ DROP SEQUENCE zoid_seq;
+ CREATE SEQUENCE zoid_seq;
+ """
+ self.runner.run_script(cursor, stmt)
+ self.connmanager.open_and_call(callback)
+
+ def drop_all(self):
+ """Drop all tables and sequences."""
+ def callback(conn, cursor):
+ for tablename in ('pack_state_tid', 'pack_state',
+ 'pack_object', 'object_refs_added', 'object_ref',
+ 'current_object', 'object_state', 'transaction',
+ 'commit_lock', 'pack_lock',
+ 'temp_store', 'temp_undo', 'temp_pack_visit'):
+ cursor.execute("DROP TABLE %s" % tablename)
+ cursor.execute("DROP SEQUENCE zoid_seq")
+ self.connmanager.open_and_call(callback)
Added: relstorage/trunk/relstorage/adapters/scriptrunner.py
===================================================================
--- relstorage/trunk/relstorage/adapters/scriptrunner.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/scriptrunner.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,161 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from relstorage.adapters.interfaces import IScriptRunner
+from zope.interface import implements
+import logging
+import re
+
+log = logging.getLogger(__name__)
+
+
+class ScriptRunner(object):
+ implements(IScriptRunner)
+
+ # script_vars contains replacements for parts of scripts.
+ # These are correct for PostgreSQL and MySQL but not for Oracle.
+ script_vars = {
+ 'TRUE': 'TRUE',
+ 'FALSE': 'FALSE',
+ 'OCTET_LENGTH': 'OCTET_LENGTH',
+ 'TRUNCATE': 'TRUNCATE',
+ 'oid': '%(oid)s',
+ 'tid': '%(tid)s',
+ 'pack_tid': '%(pack_tid)s',
+ 'undo_tid': '%(undo_tid)s',
+ 'self_tid': '%(self_tid)s',
+ 'min_tid': '%(min_tid)s',
+ 'max_tid': '%(max_tid)s',
+ }
+
+ def run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+ """Execute a statement from a script with the given parameters.
+
+ params should be either an empty tuple (no parameters) or
+ a map.
+
+ The input statement is generic and needs to be transformed
+ into a database-specific statement.
+ """
+ stmt = generic_stmt % self.script_vars
+ try:
+ cursor.execute(stmt, generic_params)
+ except:
+ log.warning("script statement failed: %r; parameters: %r",
+ stmt, generic_params)
+ raise
+
+ def run_script(self, cursor, script, params=()):
+ """Execute a series of statements in the database.
+
+ params should be either an empty tuple (no parameters) or
+ a map.
+
+ The statements are transformed by run_script_stmt
+ before execution.
+ """
+ lines = []
+ for line in script.split('\n'):
+ line = line.strip()
+ if not line or line.startswith('--'):
+ continue
+ if line.endswith(';'):
+ line = line[:-1]
+ lines.append(line)
+ stmt = '\n'.join(lines)
+ self.run_script_stmt(cursor, stmt, params)
+ lines = []
+ else:
+ lines.append(line)
+ if lines:
+ stmt = '\n'.join(lines)
+ self.run_script_stmt(cursor, stmt, params)
+
+ def run_many(self, cursor, stmt, items):
+ """Execute a statement repeatedly. Items should be a list of tuples.
+
+ stmt should use '%s' parameter format.
+ """
+ cursor.executemany(stmt, items)
+
+
+class OracleScriptRunner(ScriptRunner):
+
+ script_vars = {
+ 'TRUE': "'Y'",
+ 'FALSE': "'N'",
+ 'OCTET_LENGTH': 'LENGTH',
+ 'TRUNCATE': 'TRUNCATE TABLE',
+ 'oid': ':oid',
+ 'tid': ':tid',
+ 'pack_tid': ':pack_tid',
+ 'undo_tid': ':undo_tid',
+ 'self_tid': ':self_tid',
+ 'min_tid': ':min_tid',
+ 'max_tid': ':max_tid',
+ }
+
+ def run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+ """Execute a statement from a script with the given parameters.
+
+ params should be either an empty tuple (no parameters) or
+ a map.
+ """
+ if generic_params:
+ # Oracle raises ORA-01036 if the parameter map contains extra keys,
+ # so filter out any unused parameters.
+ tracker = TrackingMap(self.script_vars)
+ stmt = generic_stmt % tracker
+ used = tracker.used
+ params = {}
+ for k, v in generic_params.iteritems():
+ if k in used:
+ params[k] = v
+ else:
+ stmt = generic_stmt % self.script_vars
+ params = ()
+
+ try:
+ cursor.execute(stmt, params)
+ except:
+ log.warning("script statement failed: %r; parameters: %r",
+ stmt, params)
+ raise
+
+ def run_many(self, cursor, stmt, items):
+ """Execute a statement repeatedly. Items should be a list of tuples.
+
+ stmt should use '%s' parameter format.
+ """
+ # replace '%s' with ':n'
+ matches = []
+ def replace(match):
+ matches.append(None)
+ return ':%d' % len(matches)
+ stmt = intern(re.sub('%s', replace, stmt))
+
+ cursor.executemany(stmt, items)
+
+
+class TrackingMap:
+ """Provides values for keys while tracking which keys are accessed."""
+
+ def __init__(self, source):
+ self.source = source
+ self.used = set()
+
+ def __getitem__(self, key):
+ self.used.add(key)
+ return self.source[key]
+
Added: relstorage/trunk/relstorage/adapters/stats.py
===================================================================
--- relstorage/trunk/relstorage/adapters/stats.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/stats.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,93 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Stats implementations
+"""
+
+class PostgreSQLStats(object):
+
+ def __init__(self, connmanager):
+ self.connmanager = connmanager
+
+ def get_object_count(self):
+ """Returns the number of objects in the database"""
+ # do later
+ return 0
+
+ def get_db_size(self):
+ """Returns the approximate size of the database in bytes"""
+ def callback(conn, cursor):
+ cursor.execute("SELECT pg_database_size(current_database())")
+ return cursor.fetchone()[0]
+ return self.connmanager.open_and_call(callback)
+
+
+class MySQLStats(object):
+
+ def __init__(self, connmanager):
+ self.connmanager = connmanager
+
+ def get_object_count(self):
+ """Returns the number of objects in the database"""
+ # do later
+ return 0
+
+ def get_db_size(self):
+ """Returns the approximate size of the database in bytes"""
+ conn, cursor = self.connmanager.open()
+ try:
+ cursor.execute("SHOW TABLE STATUS")
+ description = [i[0] for i in cursor.description]
+ rows = list(cursor)
+ finally:
+ self.connmanager.close(conn, cursor)
+ data_column = description.index('Data_length')
+ index_column = description.index('Index_length')
+ return sum([row[data_column] + row[index_column] for row in rows], 0)
+
+
+class OracleStats(object):
+
+ def __init__(self, connmanager):
+ self.connmanager = connmanager
+
+ def get_object_count(self):
+ """Returns the number of objects in the database"""
+ # The tests expect an exact number, but the code below generates
+ # an estimate, so this is disabled for now.
+ if True:
+ return 0
+ else:
+ conn, cursor = self.connmanager.open(
+ self.connmanager.isolation_read_only)
+ try:
+ stmt = """
+ SELECT NUM_ROWS
+ FROM USER_TABLES
+ WHERE TABLE_NAME = 'CURRENT_OBJECT'
+ """
+ cursor.execute(stmt)
+ res = cursor.fetchone()[0]
+ if res is None:
+ res = 0
+ else:
+ res = int(res)
+ return res
+ finally:
+ self.connmanager.close(conn, cursor)
+
+ def get_db_size(self):
+ """Returns the approximate size of the database in bytes"""
+ # May not be possible without access to the dba_* objects
+ return 0
+
Added: relstorage/trunk/relstorage/adapters/txncontrol.py
===================================================================
--- relstorage/trunk/relstorage/adapters/txncontrol.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/txncontrol.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,182 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""TransactionControl implementations"""
+
+from base64 import encodestring
+from relstorage.adapters.interfaces import ITransactionControl
+from zope.interface import implements
+import logging
+import re
+import time
+
+log = logging.getLogger(__name__)
+
+
+class TransactionControl(object):
+ """Abstract base class"""
+
+ def commit_phase1(self, conn, cursor, tid):
+ """Begin a commit. Returns the transaction name.
+
+ The transaction name must not be None.
+
+ This method should guarantee that commit_phase2() will succeed,
+ meaning that if commit_phase2() would raise any error, the error
+ should be raised in commit_phase1() instead.
+ """
+ return '-'
+
+ def commit_phase2(self, conn, cursor, txn):
+ """Final transaction commit.
+
+ txn is the name returned by commit_phase1.
+ """
+ conn.commit()
+
+ def abort(self, conn, cursor, txn=None):
+ """Abort the commit. If txn is not None, phase 1 is also aborted."""
+ conn.rollback()
+
+
+class PostgreSQLTransactionControl(TransactionControl):
+ implements(ITransactionControl)
+
+ def get_tid_and_time(self, cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+ cursor.execute("""
+ SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ """)
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+
+ def add_transaction(self, cursor, tid, username, description, extension,
+ packed=False):
+ """Add a transaction."""
+ stmt = """
+ INSERT INTO transaction
+ (tid, packed, username, description, extension)
+ VALUES (%s, %s,
+ decode(%s, 'base64'), decode(%s, 'base64'), decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (tid, packed,
+ encodestring(username), encodestring(description),
+ encodestring(extension)))
+
+
+class MySQLTransactionControl(TransactionControl):
+ implements(ITransactionControl)
+
+ def __init__(self, Binary):
+ self.Binary = Binary
+
+ def get_tid_and_time(self, cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+ # Lock in share mode to ensure the data being read is up to date.
+ cursor.execute("""
+ SELECT tid, UNIX_TIMESTAMP()
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ LOCK IN SHARE MODE
+ """)
+ assert cursor.rowcount == 1
+ tid, timestamp = cursor.fetchone()
+ # MySQL does not provide timestamps with more than one second
+ # precision. To provide more precision, if the system time is
+ # within one minute of the MySQL time, use the system time instead.
+ now = time.time()
+ if abs(now - timestamp) <= 60.0:
+ timestamp = now
+ return tid, timestamp
+
+ def add_transaction(self, cursor, tid, username, description, extension,
+ packed=False):
+ """Add a transaction."""
+ stmt = """
+ INSERT INTO transaction
+ (tid, packed, username, description, extension)
+ VALUES (%s, %s, %s, %s, %s)
+ """
+ cursor.execute(stmt, (
+ tid, packed, self.Binary(username),
+ self.Binary(description), self.Binary(extension)))
+
+
+class OracleTransactionControl(TransactionControl):
+ implements(ITransactionControl)
+
+ def __init__(self, Binary, twophase=False):
+ self.Binary = Binary
+ self.twophase = twophase
+
+ def commit_phase1(self, conn, cursor, tid):
+ """Begin a commit. Returns the transaction name.
+
+ The transaction name must not be None.
+
+ This method should guarantee that commit_phase2() will succeed,
+ meaning that if commit_phase2() would raise any error, the error
+ should be raised in commit_phase1() instead.
+ """
+ if self.twophase:
+ conn.prepare()
+ return '-'
+
+ def _parse_dsinterval(self, s):
+ """Convert an Oracle dsinterval (as a string) to a float."""
+ mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
+ if not mo:
+ raise ValueError(s)
+ day, hour, min, sec = [float(v) for v in mo.groups()]
+ return day * 86400 + hour * 3600 + min * 60 + sec
+
+ def get_tid_and_time(self, cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+ cursor.execute("""
+ SELECT MAX(tid), TO_CHAR(TO_DSINTERVAL(SYSTIMESTAMP - TO_TIMESTAMP_TZ(
+ '1970-01-01 00:00:00 +00:00','YYYY-MM-DD HH24:MI:SS TZH:TZM')))
+ FROM transaction
+ """)
+ tid, now = cursor.fetchone()
+ return tid, self._parse_dsinterval(now)
+
+ def add_transaction(self, cursor, tid, username, description, extension,
+ packed=False):
+ """Add a transaction."""
+ stmt = """
+ INSERT INTO transaction
+ (tid, packed, username, description, extension)
+ VALUES (:1, :2, :3, :4, :5)
+ """
+ max_desc_len = 2000
+ if len(description) > max_desc_len:
+ log.warning('Trimming description of transaction %s '
+ 'to %d characters', tid, max_desc_len)
+ description = description[:max_desc_len]
+ cursor.execute(stmt, (
+ tid, packed and 'Y' or 'N', self.Binary(username),
+ self.Binary(description), self.Binary(extension)))
+
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/relstorage.py 2009-09-23 21:12:58 UTC (rev 104464)
@@ -554,7 +554,7 @@
# get the commit lock and add the transaction now
cursor = self._store_cursor
packed = (status == 'p')
- adapter.start_commit(cursor)
+ adapter.hold_commit_lock(cursor, ensure_current=True)
tid_int = u64(tid)
try:
adapter.add_transaction(
@@ -581,7 +581,7 @@
adapter = self._adapter
cursor = self._store_cursor
- adapter.start_commit(cursor)
+ adapter.hold_commit_lock(cursor, ensure_current=True)
user, desc, ext = self._ude
# Choose a transaction ID.
@@ -669,6 +669,7 @@
cursor = self._store_cursor
assert cursor is not None
+ conn = self._store_conn
if self._max_stored_oid > self._max_new_oid:
self._adapter.set_min_oid(cursor, self._max_stored_oid + 1)
@@ -678,7 +679,8 @@
serials = self._finish_store()
self._adapter.update_current(cursor, tid_int)
- self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
+ self._prepared_txn = self._adapter.commit_phase1(
+ conn, cursor, tid_int)
if self._txn_blobs:
# We now have a transaction ID, so rename all the blobs
@@ -718,7 +720,9 @@
self._rollback_load_connection()
txn = self._prepared_txn
assert txn is not None
- self._adapter.commit_phase2(self._store_cursor, txn)
+ self._adapter.commit_phase2(
+ self._store_conn, self._store_cursor, txn)
+ self._adapter.release_commit_lock(self._store_cursor)
cache = self._cache_client
if cache is not None:
if cache.incr('commit_count') is None:
@@ -740,7 +744,9 @@
try:
self._rollback_load_connection()
if self._store_cursor is not None:
- self._adapter.abort(self._store_cursor, self._prepared_txn)
+ self._adapter.abort(
+ self._store_conn, self._store_cursor, self._prepared_txn)
+ self._adapter.release_commit_lock(self._store_cursor)
if self._txn_blobs:
for oid, filename in self._txn_blobs.iteritems():
if os.path.exists(filename):
More information about the checkins
mailing list